精华内容
下载资源
问答
  • RabbitMQ发布订阅模式

    2021-05-05 20:49:34
    X代表交换机rabbitMQ内部组件,erlang 消息产生者是代码完成,代码的执行效率不高,消息产生者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费 相关场景:邮件群发,群...

    流程示例

    在这里插入图片描述

    特点

    1.一个生产者
    2.多个消费者
    3.多个队列。
    4.交换机 转发消息。

    X代表交换机rabbitMQ内部组件,erlang 消息产生者是代码完成,代码的执行效率不高,消息产生者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费
    相关场景:邮件群发,群聊天,广播(广告)

    保证所有消费者都可以拿到信息

    代码实例

    生产者:

    
        
        import com.rabbitmq.client.BuiltinExchangeType;
        import com.rabbitmq.client.Channel;
        import com.rabbitmq.client.Connection;
        import com.rabbitmq.client.ConnectionFactory;
    
        public class Product {
            public static void main(String[] args)throws  Exception {
                //创建连接工厂 --配置连接信息
                ConnectionFactory factory=new ConnectionFactory();
                factory.setHost("IP地址");
                //创建连接对象Connection
                Connection connection=factory.newConnection();
                //创建信道
                Channel channel = connection.createChannel();
        
                //创建队列
                /**
                 * String queue, 队列的名称
                 * boolean durable, 是否该队列持久化 rabbitMQ服务重启后该存放是否存在。
                 * boolean exclusive, 是否独占 false
                 * boolean autoDelete, 是否自动删除  如果长时间没有发生消息 则自动删除
                 * Map<String, Object> arguments 额外参数  先给null
                 */
                channel.queueDeclare("队列1",true,false,false,null);
                channel.queueDeclare("队列2",true,false,false,null);
        
                //创建交换机
                /**
                 * String exchange,交换机的名称
                 * BuiltinExchangeType type, 交换机的类型
                 * boolean durable:是否持久化
                 */
                channel.exchangeDeclare("交换机", BuiltinExchangeType.FANOUT,true);
                /**
                 * String queue,  队列名
                 * String exchange, 交换机的名称
                 * String routingKey:路由key 如果交换机为fanout模式则不需要路由key
                 */
                channel.queueBind("队列1","交换机","");
                channel.queueBind("队列2","交换机","");
                //发生消息
                /**
                 * String exchange: 交换机的名称 如果没有则使用“” 它回自动采用默认
                 * String routingKey, 路由key  如果没有交换机的绑定 使用队列的名称
                 * BasicProperties props, 消息的一些额外配置 目前先不加 null
                 * byte[] body 消息的内容
                 */
                for(int i=0;i<10;i++) {
                    String msg = "所有消费者都可以拿到信息"+i;
                    channel.basicPublish("绑定交换机", "", null, msg.getBytes());
                }
                //生产者这里可以管理资源  消费者不能关闭资源。
                channel.close();
                connection.close();
        
            }
        }
        
    

    消费者:

        
        import java.io.IOException;
    
        public class Consumer01 {
            public static void main(String[] args) throws Exception{
                //创建连接工厂 --配置连接信息
                ConnectionFactory factory=new ConnectionFactory();
                factory.setHost("IP地址");
                //创建连接对象Connection
                Connection connection=factory.newConnection();
                //创建信道
                Channel channel = connection.createChannel();
        
                //接受消息
                /**
                 * (String queue, 队列的名称
                 *  boolean autoAck, 是否自动确认
                 *  Consumer callback: 回调方法 当队列中存在信息后 会自动触发回调函数。
                 */
                DefaultConsumer callback=new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        //body 接受的信息
                        try {
                            Thread.sleep(1000L);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("消费者:"+new String(body));
                    }
                };
                channel.basicConsume("队列",true,callback);
        
            }
              }
    
    展开全文
  • 发布订阅模式下创建RabbitMq实例 发布订阅模式queueName必须为空,要传入交换机exChangeName的名称,routingkey为空 rabbitmq服务封装 package rabbitmq import ( "fmt" "github.com/pkg/errors" "github....

    订阅模式:一个生产者发送的消息会被多个消费者获取。

    在这里插入图片描述

    发布订阅模式下创建RabbitMq实例

    发布订阅模式queueName必须为空,要传入交换机exChangeName的名称,routingkey为空

    rabbitmq服务封装

    package rabbitmq
    
    import (
    	"fmt"
    	"github.com/pkg/errors"
    	"github.com/streadway/amqp"
    )
    
    /****************RabbitMQ初始化start********************/
    
    // 除了simple 模式外、其他的模式都是由 队列 交换机 key 不同组合实现的
    type RabbitMQ struct {
    	conn      *amqp.Connection
    	channel   *amqp.Channel
    	QueueName string //队列
    	Exchange  string //交换机
    	Key       string //key
    	MQUrl     string //连接信息
    }
    
    // 创建RabbitMQ 实例
    func newRabbitMQ(queueName, exchange, key string) (*RabbitMQ, error) {
    	rabbitMQ := &RabbitMQ{
    		QueueName: queueName,
    		Exchange:  exchange,
    		Key:       key,
    		MQUrl: "amqp://user:password@127.0.0.1:5672/", //Virtual host用的哪个库
    	}
    
    	var err error
    	// dial mq
    	rabbitMQ.conn, err = amqp.Dial(rabbitMQ.MQUrl)
    	rabbitMQ.failOnErr(err, "创建连接错误")
    	if err != nil {
    		return nil, errors.Wrap(err, "create connect error")
    	}
    	// get channel
    	rabbitMQ.channel, err = rabbitMQ.conn.Channel()
    	if err != nil {
    		return nil, errors.Wrap(err, "get channel err")
    	}
    
    	return rabbitMQ, nil
    }
    
    // 错误处理
    func (r *RabbitMQ) failOnErr(err error, message string) {
    	if err != nil {
    		panic(fmt.Sprintf("%s:%s", err.Error(), message))
    	}
    }
    
    /**************RabbitMQ初始化end*********/
    
    /*****************订阅模式start***************************/
    
    //订阅模式创建RabbitMQ实例
    func NewRabbitMQPubSub(exchangeName string) (*RabbitMQ, error) {
    	//在simple模式下 exchange and key 都为空
    	rabbitMQ, err := newRabbitMQ("", exchangeName, "")
    
    	if err != nil {
    		return nil, err
    	}
    	return rabbitMQ, nil
    }
    
    //订阅模式生产
    func (r *RabbitMQ) PublishPub(message string) error {
    
    	// 2.1 申请队列、 如果队列不存在则会自动创建、 如果存在则跳过创建
    	// 保证队列存在、 消息能发送到队列中
    	err := r.channel.ExchangeDeclare(
    		r.Exchange,
    		//是否持久化
    		"fanout",
    		// 是否自动删除
    		true,
    		// 是否具有排他性
    		false,
    		//是否阻塞
    		false,
    		// 额外属性
    		false,
    		nil,
    	)
    	if err != nil {
    		return errors.Wrap(err, "QueueDeclare err")
    	}
    
    	// 2.2 发送消息到队列中
    	err = r.channel.Publish(
    		r.Exchange,
    		"",
    		// 如果为true  根据exchange 类型 和 routkey规则、 如果无法找到符合条件的队列、那么会把发送完的消息返回给发送者
    		false,
    		// 如果为true  当exchange发送消息 到队列后发现队列上没有绑定消费者, 则会把消息还给 发送者
    		false,
    		amqp.Publishing{
    			// 消息持久化
    			DeliveryMode: amqp.Persistent,
    			ContentType:  "text/plain",
    			Body:         []byte(message),
    		},
    	)
    	if err != nil {
    		return errors.Wrap(err, "Publish err")
    	}
    
    	return nil
    }
    
    
    //订阅模式消费
    // Step 3. 
    func (r *RabbitMQ) PublishSub() {
    
    	// 3.1 申请队列、 如果队列不存在则会自动创建、 如果存在则跳过创建
    	// 保证队列存在、 消息能发送到队列中
    	err := r.channel.ExchangeDeclare(
    		r.Exchange,
    		//是否持久化
    		"fanout",
    		// 是否自动删除
    		true,
    		// 是否具有排他性
    		false,
    		//是否阻塞
    		false,
    		// 额外属性
    		false,
    		nil,
    	)
    	if err != nil {
    		fmt.Println("Consume err: ", err)
    	}
    
    	q, err := r.channel.QueueDeclare(
    		"",
    		// 用来区分多个消费者
    		false,
    		// 是否自动应答 (消费完了后 是否自动告诉rabbitmq服务 消费完了 默认为true)
    		true,
    		// 是否具有排他性 (true 就是 创建了自己可见的队列)
    		true,
    		// 如果为true 表示 不能将同一个 connection 中发送的消息传递给这个connection中的消费者 (默认为false)
    		false,
    		// 是否阻塞 false为不阻塞  (阻塞就是 这个消费完 下个再进来、我们系统中是需要的)
    		nil,
    		// 其他参数
    	)
    
    	if err != nil {
    		fmt.Println("Consume err: ", err)
    	}
    
    	err = r.channel.QueueBind(
    		q.Name,
    		"",
    		r.Exchange,
    		false,
    		nil,
    	)
    
    
    	message, err := r.channel.Consume(
    		r.QueueName,
    		// 用来区分多个消费者
    		"",
    		// 是否自动应答 (消费完了后 是否自动告诉rabbitmq服务 消费完了 默认为true)
    		true,
    		// 是否具有排他性 (true 就是 创建了自己可见的队列)
    		false,
    		// 如果为true 表示 不能将同一个 connection 中发送的消息传递给这个connection中的消费者 (默认为false)
    		false,
    		// 是否阻塞 false为不阻塞  (阻塞就是 这个消费完 下个再进来、我们系统中是需要的)
    		true,
    		// 其他参数
    		nil,
    	)
    	if err != nil {
    		fmt.Println("Consume err: ", err)
    	}
    
    	//不让协程终止
    	forever := make(chan bool)
    
    	// 启用 goroutine 处理消息
    	go func() {
    		for d := range message {
    			//接收消息进行逻辑处理
    			fmt.Printf("Received a message: %s \n", d.Body)
    		}
    	}()
    
    	// 在没有消息处理后 进行阻塞
    	<-forever 	//不让协程终止
    }
    
    /************订阅模式end********************************/

    消费者代码1

    package main
    
    import (
    	"MqTest/rabbitmq"
    	"fmt"
    )
    
    func main() {
    	exchangeName := "exchangeName"
    	rabbitMQ, err  := rabbitmq.NewRabbitMQPubSub(exchangeName)
    	if err != nil {
    		fmt.Println(err)
    	}
    	if err != nil {
    		fmt.Println(err)
    	}
    	rabbitMQ.PublishSub()
    }
    

    消费者代码2

    package main
    
    import (
    	"MqTest/rabbitmq"
    	"fmt"
    )
    
    func main() {
    	exchangeName := "exchangeName"
    	rabbitMQ, err  := rabbitmq.NewRabbitMQPubSub(exchangeName)
    	if err != nil {
    		fmt.Println(err)
    	}
    	if err != nil {
    		fmt.Println(err)
    	}
    	rabbitMQ.PublishSub()
    }
    

    生产者代码

    package main
    
    import (
    	"MqTest/rabbitmq"
    	"fmt"
    	"strconv"
    )
    
    func main() {
    	exchangeName := "exchangeName"
    	rabbitMQ, err  := rabbitmq.NewRabbitMQPubSub(exchangeName)
    	if err != nil {
    		fmt.Println(err)
    	}
    	for i:=0; i<10;i++ {
    		data := "第"+strconv.Itoa(i)+"次学习mq"
    		rabbitMQ.PublishPub(data)
    	}
    	fmt.Println("发送成功")
    }
    

     

    展开全文
  • RabbitMQ发布订阅模式原理和实现(交换机) 这个可能是消息队列中最重要的队列了,其他的都是在它的基础上进行了扩展。 功能实现:一个生产者发送消息,多个消费者获取消息(同样的消息),包括一个生产者,一个...

    RabbitMQ发布订阅模式原理和实现(交换机)

    这个可能是消息队列中最重要的队列了,其他的都是在它的基础上进行了扩展。
    功能实现:一个生产者发送消息,多个消费者获取消息(同样的消息),包括一个生产者,一个交换机,多个队列,多个消费者。
     思路解读(重点理解): 
    (1)一个生产者,多个消费者
    (2)每一个消费者都有自己的一个队列
    (3)生产者没有直接发消息到队列中,而是发送到交换机
    (4)每个消费者的队列都绑定到交换机上
    (5)消息通过交换机到达每个消费者的队列
    该模式就是Fanout Exchange(扇型交换机)将消息路由给绑定到它身上的所有队列
    以用户发邮件案例讲解
    注意:交换机没有存储消息功能,如果消息发送到没有绑定消费队列的交换机,消息则丢失。

    交换机的作用:

    生产者发送消息不会向传统方式直接将消息投递到队列中,而是先将消息投递到交换机中,在由交换机转发到具体的队列,队列在将消息以推送或者拉取方式给消费者进行消费,这和我们之前学习Nginx有点类似。
    交换机的作用根据具体的路由策略分发到不同的队列中,交换机有四种类型。
    Direct exchange(直连交换机)是根据消息携带的路由键(routing key)将消息投递给对应队列的
    Fanout exchange(扇型交换机)将消息路由给绑定到它身上的所有队列
    Topic exchange(主题交换机)队列通过路由键绑定到交换机上,然后,交换机根据消息里的路由值,将消息路由给一个或多个绑定队列
    Headers exchange(头交换机)类似主题交换机,但是头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。 

     

     

    Rabbit高级队列(发布订阅)

      生产者投递消息给交换机缓存起来(不会直接给队列),交换机根据路由策略RoutingKey转发到不同的队列服务器中。队列服务器再以推送或者拉取形式让消费者消费。(类似Nginx)

      

    RabbitMQ发布与订阅原理: 

       案例:   用户注册 ---> 发送邮件 --->发送短信  

          

      


     

     pom文件:

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>com.toov5.rabibitMQScribe</groupId>
      <artifactId>rabibitMQScribe</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      
      <dependencies>
    		<dependency>
    			<groupId>com.rabbitmq</groupId>
    			<artifactId>amqp-client</artifactId>
    			<version>3.6.5</version>
    		</dependency>
    	</dependencies>
      
    </project>
    

     连接工具类:

    package com.toov5.utils;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
     
    //没有做成单例的  VirtualHost 需要复用
    public class MQConnectionUtils {
        //创建新的连接
        public static Connection newConnection() throws IOException, TimeoutException {
             //创建连接工厂
        ConnectionFactory factory= new ConnectionFactory();
        //链接地址
        factory.setHost("192.168.91.6");
        //用户名称
        factory.setUsername("admin");
        //用户密码
        factory.setPassword("admin");
        //amqp端口号
        factory.setPort(5672);
        //连接virtualhost
        factory.setVirtualHost("/admin_toov5");
        Connection connection = factory.newConnection();
            return connection;
        }    
    }

     

    生产者:

    package com.toov5.fanout;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.toov5.utils.MQConnectionUtils;
    
    //生产者 交换机类型 producerFanout类型
    public class FanoutProducer {
        //交换机名称
         private static final String EXCHANGE_NAME = "my_fanout"; 
         public static void main(String[] args) throws IOException, TimeoutException {
            //建立MQ连接
             Connection connection = MQConnectionUtils.newConnection();
            //创建通道
              Channel channel = connection.createChannel();
              //生产者绑定交换机
              channel.exchangeDeclare(EXCHANGE_NAME, "fanout");  //交换机名称  交换机类型
              //创建对应的消息 
              String msString = "my_fanout_destination_msg";
              //通过频道 发送消息
              System.out.println("生产者投递消息:"+msString);
              //消息投递到交换机里面去
              channel.basicPublish(EXCHANGE_NAME, "", null, msString.getBytes());
              //关闭通道 和 连接
              channel.close();
              connection.close();
        }
        
    } 

    消费者:

    package com.toov5.fanout;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.toov5.utils.MQConnectionUtils;
    
    //邮件消费者
    public class ConsumerEmailFanout {
        private static final String EMAIL_QUEUE ="email_queue_fanout";
        //交换机名称
       private static final String EXCHANGE_NAME = "my_fanout"; 
         public static void main(String[] args) throws IOException, TimeoutException {
             System.out.println("邮件消费者启动");
            //建立MQ连接
             Connection connection = MQConnectionUtils.newConnection();
            //创建通道
              Channel channel = connection.createChannel();
              
           //消费者声明队列
              channel.queueDeclare(EMAIL_QUEUE, false, false, false, null);
             //消费者队列绑定交换机
              channel.queueBind(EMAIL_QUEUE, EXCHANGE_NAME, "");
              //消费者监听消息
          DefaultConsumer defaultConsumer =     new DefaultConsumer(channel) {
                  //重写监听方法
                 @Override
                 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                         throws IOException {             
                     String msg = new String(body,"UTF-8");
                     System.out.println("邮件消费者获取生产者消息"+msg);
                 }
             };
             channel.basicConsume(EMAIL_QUEUE,true, defaultConsumer);   //绑定队列 事件监听
                
        }
    }
    package com.toov5.fanout;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.toov5.utils.MQConnectionUtils;
    
    //邮件消费者
    public class ConsumerSMSFanout {
        private static final String SMS_QUEUE ="sms_queue_fanout";
        //交换机名称
       private static final String EXCHANGE_NAME = "my_fanout"; 
         public static void main(String[] args) throws IOException, TimeoutException {
             System.out.println("短信消费者启动");
            //建立MQ连接
             Connection connection = MQConnectionUtils.newConnection();
            //创建通道
              Channel channel = connection.createChannel();
              
           //消费者声明队列
              channel.queueDeclare(SMS_QUEUE, false, false, false, null);
             //消费者队列绑定交换机
              channel.queueBind(SMS_QUEUE, EXCHANGE_NAME, "");
              //消费者监听消息
          DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                  //重写监听方法
                 @Override
                 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                         throws IOException {             
                     String msg = new String(body,"UTF-8");
                     System.out.println("邮件消费者获取生产者消息"+msg);
                 }
             };
             channel.basicConsume(SMS_QUEUE,true, defaultConsumer);   //绑定队列 事件监听
      
        }
    }

     

     

     

    展开全文
  • 如果你学过vue,我想你一定对vue的父子组件通信产生过好奇...我也很好奇,所以我就去百度同时看了vue2的源码,发现里面使用了一种设计模式,叫发布-订阅模式。于是,我就详详细细的去了解了发布-订阅模式是怎么回事...
    如果你学过vue,我想你一定对vue的父子组件通信产生过好奇,为什么子组件中$emit函数中 填写event:事件名,args:传递的参数,在父组件中写上对应的event事件名,和回调函数,就可以在回调函数中接收到子组件传递的args参数呢?我也很好奇,所以我就去百度同时看了vue2的源码,发现里面使用了一种设计模式,叫发布-订阅模式。于是,我就详详细细的去了解了发布-订阅模式是怎么回事。下面我也用我的方式来描述一下,我对发布订阅模式来的理解,以及我遇到的一些问题和如何解决这些问题。发布-订阅模式,顾名思义,就是发布者发布消息,订阅者就可以用接收到发布者发布的信息,很好理解。转化成代码,要怎么写呢?看下面示例:
    let eventsMixin = {      /* 收集订阅者 */      events: {},      /* 参数:订阅的主题。回调函数, 收集依赖 */      on(event, fn) {        (this.events[event] || (this.events[event] = [])).push(fn);      },      /* 发布数据 */      emit(event, content) {        if (!this.events[event] || this.events[event].length === 0) {          console.log('没有人订阅信息--->', event, content);          return;        }        let tempList = this.events[event];        tempList.forEach(fn => {           fn(content)        })      }    }    eventsMixin.on('handleTitle', handleTitle);    eventsMixin.emit('handleTitle', '标题');     /* 处理事件 */    function handleTitle(title) {      console.log('收到消息了--->', title);    }    
    首先定义了一个eventsMixin对象,其中有一个events空对象,两个函数,on函数接收一个事件名,一个回调函数。emit函数接收一个事件名,一个需要发送的消息参数。on函数中把传递进来的fn函数,push到events对象的event属性中,看第六行代码,没有该变量先创建否则会报错(可想而知)。emit函数中去遍历events对象的event属性中的数据,并执行数据中的方法,因为在on函数中的event属性存的是个数组函数,此时emit函数中正好传递进来的数据,那么这个参数也正好可以被此前的fn函数中的参数接收,这样就完成了,订阅者订阅到了发布者发布的消息了。看结果:44d4a25fc9cce03aff7c67d7b7741c19.png我自己打出这简单的几行代码以后,震惊到了,就这么几行代码,就很巧妙的实现了功能,对于数组中存储方法,在特定的时间再去执行该方法有了更加深刻的理解。下面在试试不同的事件,多个相同的事件,不同的处理,会不会收到数据
    eventsMixin.on('handleTitle', handleTitle);eventsMixin.on('content', handleContent);eventsMixin.on('content', handleContent1);eventsMixin.emit('handleTitle', '标题');eventsMixin.emit('content', '内容'); function handleTitle(title) {  console.log('收到消息了--->', title);}function handleContent(content) {  console.log('收到消息了--->', content);}function handleContent1(content) {  console.log('收到消息了1--->', content);}

    8b8a7e39a94031e4b5888655dfc13f06.png

    答案是肯定的。至此我们对vue的子向传值的原理,和发布-订阅模式应该有了了解了。那么对定的off ,once 方法我们是不是也顺便了解了解呢?off的话就是,删除所有订阅者,也可以删除对应发布者的所有订阅者,还可以是对应发布者中某个订阅者,对应代码就是删除events对象中特定的数据。once的话和on很相似,只是发布者发布的消息只会接收到一次,仅此而已,对应代码就是先订阅到数据后,再删除。
    // 取消订阅off 函数在eventsMixin对象中,简写了off(event, fn) {  /* 需要校验参数 */  let _this = this;  // let fns = _this.events[event];  /* 没有参数,移除所有的事件监听器 */  if (!arguments.length) {    _this.events = {};    return;  }  /* 只有事件名称,移除该事件所有的监听器 */  if (arguments.length === 1) {    delete _this.events[event];    return;  }  /* 如果同时提供了事件与回调,则只移除这个回调的监听器 */  if (arguments.length === 2) {    let fnList = _this.events[event];    if (!fnList) return;    // 若有 fn,遍历缓存列表,看看传入的 fn 与哪个函数相同,如果相同就直接从缓存列表中删掉即可    for (let i = 0; i < fnList.length; i++) {      if (fnList[i] === fn) {              fnList.splice(i, 1);        break      }    }  }}eventsMixin.on('handleTitle', handleTitle);eventsMixin.on('content', handleContent);eventsMixin.on('content', handleContent1);eventsMixin.emit('handleTitle', '标题');eventsMixin.emit('content', '内容');eventsMixin.off('content', handleContent);eventsMixin.emit('content', '新的内容');
    看结果:

    b11fbab1a50505c4578d8a284e3ea341.png

    'content', handleContent 对应订阅者,被删除了。所以发布者再次发布信息的时候,只有'content', handleContent1对应的订阅者才能收到数据。看off函数,参数:一个事件名,一个函数。首先,如果没有参数,就删除所有的订阅者,如果只有一个参数,就删除对应的事件的所有订阅者,如果两个参数,就删除对应事件的对应订阅者。我想上面的代码应该很好理解的。下面再来介绍once方法,只触发一次。
    /* 监听一次  函数在eventsMixin对象中,简写了*/once(event, fn) {  let _this = this;  // 先绑定,调用后删除  //    console.log(fn);  function on() {    _this.off(event, on);    fn.apply(_this, arguments);  }  on.fn = fn;  _this.on(event, on);},
    eventsMixin.once('content', handleContent);eventsMixin.emit('content', '内容');eventsMixin.emit('content', '新的内容');

    f5357fd5dbb1f7032f9a07b0a0752e3e.png

    once函数里面还定义了一个on函数(姑且叫内部on函数),内部on函数中执行off方法,和fn函数(姑且叫内部fn函数)。内部fn函数的绑定了once方法中的形参fn。最后执行on函数(外部的on函数)。这里超级绕,有一个办法可以理清楚,那就是打断点调试,就可以看到函数的执行过程了。

    最终结果也符合我们的预期。

    难道到这里就结束了吗?并没有,下面我想分享的才是我遇到的坑和网上分享不同的地方。拿起小本本仔细听。

    当我们再次订阅信息的时候

    eventsMixin.once('content', handleContent);eventsMixin.once('content', handleContent1);eventsMixin.emit('content', '内容');

    fb96cfbcff1222d832b3c8d197da521c.png

    发现只打印出来一条信息了,按道理应该打印出来两条信息才对,因为我分别订阅了两次内容。

    我看了很多遍代码后,也没有看出什么毛病,所以我就试着打断点去调试,发现当我们去做删除的时候eventList数组发生了变化,我们再在emit函数中进行forEach循环,那么就得不到数组中第二个函数了,它现在变成第一个了。那要怎么解决,好办,反向遍历即可。

    let i = eventList.length;while (i--) {  eventList[i](content);}

    f0a5b263678e950bd605823dbb4089fe.png

    得到数据了。所以遍历的时候去删除数组中的数据就要注意了,这一点,我在java遍历数组并删除数据遇到的问题也做过相应的笔记。当时使用的是迭代器。es6中也引入了迭代器的概念,那么js的迭代器也可以实现(不过我还没试,你可以试试)。

    集合与泛型第三篇

    在vue2源码中也能看到这样的处理(我当然是抄袭的了102802134d5083292e615364c8926f7c.png)

    Vue.prototype.$off = function (event, fn) {      var vm = this;      // all      if (!arguments.length) {        vm._events = Object.create(null);        return vm      }      // array of events      if (Array.isArray(event)) {        for (var i$1 = 0, l = event.length; i$1 < l; i$1++) {          vm.$off(event[i$1], fn);        }        return vm      }      // specific event      var cbs = vm._events[event];      if (!cbs) {        return vm      }      if (!fn) {        vm._events[event] = null;        return vm      }      // specific handler      var cb;      var i = cbs.length;      while (i--) {        cb = cbs[i];        if (cb === fn || cb.fn === fn) {          cbs.splice(i, 1);          break        }      }      return vm    };

    只不过源码里面是写在off函数里的。原理还是一样倒过来遍历数组

    下面是完整的代码

    <html lang="en"><head>  <meta charset="UTF-8">  <meta name="viewport" content="width=device-width, initial-scale=1.0">  <title>Documenttitle>head><body>  <script>    /* 定义调度中心 */    let eventsMixin = {      /* 收集订阅的主题 */      events: {},      /* 参数:订阅的主题。回调函数, 收集依赖 */      on(event, fn) {        (this.events[event] || (this.events[event] = [])).push(fn);      },      /* 发布数据 */      emit(event, content) {        if (!this.events[event] || this.events[event].length === 0) {          console.log('没有人订阅信息--->', event, content);          return;        }        let eventList = this.events[event];        // eventList.forEach(fn => {        //   fn(content)        // })        // for (let i in eventList) {        //     eventList[i](content);        // }        // for (let i = 0; i < eventList.length; i++) {        //    eventList[i](content);        // }        let i = eventList.length;        while (i--) {          eventList[i](content);        }        // for (let item of eventList) {        //   item(content);        // }      },      /* 监听一次 */      once(event, fn) {        let _this = this;        // 先绑定,调用后删除        //    console.log(fn);        function on() {          // console.log(fn);          _this.off(event, on);          fn.apply(_this, arguments);        }        on.fn = fn;        _this.on(event, on);      },      // 取消订阅      off(event, fn) {        /* 需要校验参数 */        let _this = this;        // let fns = _this.events[event];        /* 没有参数,移除所有的事件监听器 */        if (!arguments.length) {          _this.events = {};          return;        }        /* 只有事件名称,移除该事件所有的监听器 */        if (arguments.length === 1) {          delete _this.events[event];          return;        }        /* 如果同时提供了事件与回调,则只移除这个回调的监听器 */        if (arguments.length === 2) {          let fnList = _this.events[event];          if (!fnList) return;          // 若有 fn,遍历缓存列表,看看传入的 fn 与哪个函数相同,如果相同就直接从缓存列表中删掉即可          for (let i = 0; i < fnList.length; i++) {            if (fnList[i] === fn || fnList[i].fn === fn) {              fnList.splice(i, 1);              // console.log(fnList);              break            }          }          // let i = fnList.length          // while (i--) {          //     cb = fnList[i];          //     if (cb === fn || cb.fn === fn) {          //         fnList.splice(i, 1);          //         break          //     }          // }        }      }    }    /* 订阅者 到订阅调度中心,调度想要的内容(即主题) */    // eventsMixin.once('handleTitle', handleTitle);    eventsMixin.once('content', handleContent);    eventsMixin.once('content', handleContent1);    // eventsMixin.once('content', handleContent2);    // eventsMixin.once('content', handleTitle5);    /* 发布者 向调度中心发布内容,(要设置主题,目的,只有订阅了该主题的用户才可以获取到相应的数据)  */    // eventsMixin.emit('handleTitle', '标题');    // eventsMixin.emit('content', '内容');    eventsMixin.emit('content', '内容');    // eventsMixin.off('content', handleContent);    // eventsMixin.off();    // eventsMixin.emit('content', '新的内容');    // eventsMixin.on('content', handleTitle);    // eventsMixin.emit('content', '内容');    // eventsMixin.off();    /* 处理事件 */    function handleTitle(title) {      console.log('收到消息了--->', title);    }    function handleContent(content) {      console.log('收到消息了--->', content);    }    function handleContent1(content) {      console.log('收到消息了1--->', content);    }    function handleContent2(content) {      console.log('收到消息了2--->', content);    }    function handleTitle5(title) {      console.log('收到消息了55--->', title);    }script>body>html>
    总结:发布者与订阅者 通过event事件名来建立关系,订阅者通过回调函数接收发布者发布的消息

    9369075fe48e67a6973d76c201a09e21.png

    长按关注,听wzg扯淡

    展开全文
  • C#RabbitMQ发布订阅模式配置

    千次阅读 2018-11-08 16:59:02
    首先我们需要搭建rabbitmq服务器来中转我们的消息。 1.环境搭建 1.1由于RabbitMQ使用Erlang语言编写,所有我们需要先安装Erlang环境(安装没什么难度直接双击运行就可以了) 1.2安装RabbitMQ服务端程序 以上...
  • 一、概念AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,...AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言...
  • rabbitmq发布订阅模式-生产者

    千次阅读 2020-09-13 08:51:50
    pom: <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>...
  • rabbitmq发布订阅模式-消费者

    千次阅读 2020-09-13 08:52:08
    } } yml: server: port: 8081 spring: rabbitmq: host: 192.168.31.97 port: 5672 password: admin username: admin virtual-host: / order: fanout: #交换机 exchange: order.fanout.exchange #log日志队列 log: ...
  • 1、消息发送方(发布者) 1)添加maven依赖 <!-- springboot rabbitmq 使用--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-...
  • 本章将会讲解完全不一样的场景: 我们会把一个消息转发给多个消费者,这种模式称之为发布-订阅模式。为了阐述这个模式,我们将会搭建一个简单的日志系统,它包含两种程序:一种发送日志消息,另一种接收并打印日志...
  • 发布订阅模式下创建RabbitMq实例 发布订阅模式queueName必须为空,要传入交换机exChangeName的名称,routingkey为空 func NewRabbitMqPubSub(exchangeName string) *RabbitMq { //创建mq实例 rabbitmq := ...
  • rabbitmq发布订阅

    2019-10-12 09:18:23
    rabbitmq发布订阅 如果觉得还可以 记得关注一下公众号哦!一起交流学习! 一、发布订阅模式 还记得我们上一个文章是如何发布消息的吗? 回顾一下以前是如何发送消息的: channel.basicPublish("", QUEUE_NAME, ...
  • 发布订阅模式即向多个消费者传递同一条信息1).Exchanges 交换机RabbitMQ消息传递模型的核心思想是,生产者永远不会将任何消息直接发送到队列。相反,生产者只能向交换机(Exchange)发送消息。交换机是一个非常简单的...
  • springboot集成rabbitmq-发布订阅模式
  • RabbitMQ发布订阅模式 生产者: package com.xuecheng.rabbitmq.producer; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; ...
  • RabbitMQ发布订阅

    2018-06-15 09:50:43
    这种普遍周知的发布订阅模式。 为了解释这种模式,我们将创建一个简单的日志系统,由两个程序构成--第一个将发送日志消息,第二个将接收和打印这个消息。 在我们的日志系统每一个运行赋值的接收程序都将获取到消...
  • 这里写了一个简单的springboot的demo来处理RabbitMq发布订阅 添加pom依赖 <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <...
  • RabbitMQ发布/订阅模式

    2019-09-30 14:46:54
    1、生产者 ... ...import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.ys.utils.ConnectionUtil; /** * Create by YSOcean */public class Producer { priva...
  • rabbitmq-发布订阅模式

    2021-03-06 22:48:00
    本文po出 mq的发布订阅模式,及代码示例; 【1】intro 1) 角色: 有4个角色, 包括 生产者,消费者, 交换机 exchange(X), 队列; 2)交换机: 一方面,接收生产者的消息,另一方面,处理消息,如发送给...
  • 就是发送一个消息所有的订阅的消费都可以接收1消费者把消息发布在交换机2消费者去交换机订阅消息3一旦交换机有消息就会给消费者进行消费发送者:import com.rabbitmq.client.Channel;import ...
  • Publish/Subscribe 模式 ...而我们今天要聊的这个 发布/订阅模式 有着更复杂的工作模式, 他可以将一个消息发给多个消费者。如下图所示: Exchanges 从上面的图中,我们可以看到明显比之前的工作队列...
  • RabbitMq-发布订阅模式

    2019-04-13 17:41:49
    import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.I...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,220
精华内容 488
关键字:

rabbitmq发布订阅模式