精华内容
下载资源
问答
  • 生产消费者模式与订阅发布模式(观察者模式)区别
    千次阅读
    2020-03-02 11:45:41

    订阅发布模式是一种特殊的生产消费者模式

     

    区别:

    1.消息是否被多个对象处理。生产消费者是所有消费者抢占消息,订阅发布是所有订阅者共享消息。

    2.主动权不同。生产消费者主动权在消费者,订阅发布主动权在发布者。也就说订阅者是把主动权交给了发布者,从代码层面更好的实现解耦。

    更多相关内容
  • 订阅发布者模式本质上也是一种...订阅发布者模式有时也称为观察者模式,订阅发布者(观察这观察者)存在着主动被动的关系,而生产者消费者比较中性吧。 订阅发布模式定义了一种一对多的依赖关系,让多个订阅者...

    订阅发布者模式本质上也是一种生产者消费者模式,订阅者是消费者,发布者是生产者。如果一定要说个区别,就是抽象级别的区别吧。

     

    订阅者肯定是个消费者,但消费者不一定是订阅者,发布者一定是个生产者,但生产者不一定是个发布者。

    订阅发布者模式有时也称为观察者模式,订阅发布者(观察这和被观察者)存在着主动被动的关系,而生产者消费者比较中性吧。 订阅发布模式定义了一种一对多的依赖关系,让多个订阅者对象同时监听某一个主题对象。这个主题对象在自身状态变化时,会通知所有订阅者对象,使它们能够自动更新自己的状态。而生产者消费者关系可以是1对1,1对多,多对1,多对多关系

     

    补充:在23种设计模式中的观察者模式中,并没有中间介-队列的概念,但生产者消费者模式再多线程环境下好像天生就有队列的概念。在订阅发布者之间引入消息队列后,可以实现订阅者和发布者之间的解耦,任务可以很好的以异步方式进行处理,所以说是否有中间队列不是订阅发布者模式和生产者消费者模式的区别

     

     

     

    如下蓝色字体部分为参考内容:

    如下摘自:http://www.codeceo.com/article/javascript-observer-pattern.html

    那么到底什么是观察者模式呢. 先看看生活中的观察者模式。

    好莱坞有句名言. “不要给我打电话, 我会给你打电话”. 这句话就解释了一个观察者模式的来龙去脉。 其中“我”是发布者, “你”是订阅者。

     

    再举个例子,我来公司面试的时候,完事之后每个面试官都会对我说:“请留下你的联系方式, 有消息我们会通知你”。 在这里“我”是订阅者, 面试官是发布者。所以我不用每天或者每小时都去询问面试结果, 通讯的主动权掌握在了面试官手上。而我只需要提供一个联系方式。

     

     

     

     

     

     

     

     

     

     

     

     

     

     

    展开全文
  • 个人学习笔记分享,当前能力有限,请勿贬低,菜鸟互学,大佬绕道 如有勘误,欢迎指出讨论,本文后期也会进行修正补充 前言 设想一个场景: 你你女朋友(假设有)打算出门,你问你女朋友打扮好了没,...消费者A.

    【Redis】生产者/消费者模式 与 订阅/发布模式

    个人学习笔记分享,当前能力有限,请勿贬低,菜鸟互学,大佬绕道

    如有勘误,欢迎指出和讨论,本文后期也会进行修正和补充

    前言

    设想一个场景:
    你和你女朋友(假设有)打算出门,你问你女朋友打扮好了没,她说还没。
    于是过了五分钟,你再去问,她说还没。
    再过五分钟,你又问,她说再等会儿
    再过五分钟,你又问,她说烦不烦一直问,多等会能咋了??。。。。
    2000 years later,她准备好了,你已经准备打开网抑云了

    其实解决方案很简单,你跟她说准备好了喊你,而你做好随时被喊的准备即可

    设想如下模型:

    1. 消费者A向生产者B取货。如果B手上没有足够多的货,那么A就得一直等待,直到B有货交付给A;如果B生产的货物已经达到库存上限,那么B需要等待A取货后库存减少,再继续生产。
    2. 扩展场景3,A监听仓库,如果仓库库存增加,就取货,其余时间摸鱼,不关心生产相关的事情;B只负责生产,不关心消费相关事情。

    有发现1和2的区别吗?

    在1中,消费者和生产者需要一直询问仓库库存数量,即双方保持沟通,显然太麻烦了。
    在2中,生产者一直生产给仓库即可,其余事情不关心;消费者监听仓库,当仓库数量变动时通知消费者,其余事情不关心。生产者和消费者互不沟通。
    (什么?你问库存满了咋办?大部分情况下会保证仓库足够大,实在满了。。。通常是把旧的直接丢弃掉。。。啊这。。)

    这样就实现了一个极其关键的思想-解耦

    实际上,1被成为生产者/消费者模式,2被称为订阅/发布模式(又称观察者模式)
    后者可以算是对前者的优化方案。前者可以满足1-1,N-1,1-N,N-N,而后者通常是1-N,或者说N-1-N

    本文仅基于redis和java实现,重在整理思路,有兴趣可以自己查其他方面

    1.生产者/消费者模式

    1.1.场景预设

    消费者A从生产者B中读取数据,若有满足要求数据则返回,若无则等待直到满足为止

    生产者暂不考虑库存溢出的情况

    1.2.理论基础

    • 消费者A向仓库取货

      • 库存充足时,A取货成功,结束此操作;
      • 库存不足时,则A等待。直到有库存,A再取货,若成功,则结束,失败,则继续等待

      有没有疑惑,为什么还会失败?很简单,因为可能不止一个消费者,如果没抢到,就只能等下一轮了;也有可能进货了依然不够,那还得等到够了为止,可没有买一半这种说法

    • 生产者B向仓库出货

      • 仓库空间足够,则B出货成功,结束此操作;
      • 仓库空间不足,则B等待。直到库存变动,A再出货,若成功,则结束,失败,则继续等待

      同理生产者也可能多个,也可能库存变动后,空间依然不足,那么就得继续等了

    1.3.技术基础

    • redis可以存储队列,可以对队首或队尾进行push或者pop操作

      命令格式如下:lpop/rpop/lpush/rpush key模拟操作如下

      image-20200809155920684

    • redis可以进行堵塞读取,即同时检测多个键,若其中一个有元素,则读取,否则等待有数据为止

      命令格式如下:blpop/brpop key1 [key ...] timeout,模拟操作如下

      • 双开redis命令行A和B,A中从list1和list2中读取数据,无数据,则开始等待

        image-20200809161711444

      • B中向list2添加一个数据,则A中读取成功,读取出刚存入进去的数据

        image-20200809161736369

      • A中继续读取一个数据,若list1和list2中有数据则会读取成功,否则将进入等待,即第一步

        image-20200809161859534

    1.4.Java实现

    1. 配置redis,此处不再赘述,请自行询问度娘,或者参考我所整理的redis集成方法(没写几篇就不给传送门了)

    2. 配置redisPool,代码如下

      redis.properties

      redis.url=localhost
      redis.port=6379
      redis.maxIdle=30
      redis.minIdle=10
      redis.maxTotal=100
      redis.maxWait=10000
      

      JedisPoolUtils (自行添加jedis依赖)

      package com.yezi_tool.basic_project.commons.utils;
      
      import redis.clients.jedis.Jedis;
      import redis.clients.jedis.JedisPool;
      import redis.clients.jedis.JedisPoolConfig;
      
      import java.io.IOException;
      import java.io.InputStream;
      import java.util.Properties;
      
      /**
       * @title Jedis线程池工具类
       * @description 用于控制Jedis线程
       * @author Echo_Ye
       * @date 2020/8/9 17:28
       * @email echo_yezi@qq.com
       */
      public class JedisPoolUtils {
      
          private static JedisPool pool = null;
      
          static {
      
              //加载配置文件
              InputStream in = JedisPoolUtils.class.getClassLoader().getResourceAsStream("redis.properties");
              Properties pro = new Properties();
              try {
                  pro.load(in);
              } catch (IOException e) {
                  e.printStackTrace();
              }
      
              //获得池子对象
              JedisPoolConfig poolConfig = new JedisPoolConfig();
              poolConfig.setMaxIdle(Integer.parseInt(pro.get("redis.maxIdle").toString()));//最大连接个数
              poolConfig.setMaxWaitMillis(Integer.parseInt(pro.get("redis.maxWait").toString()));//最大闲置个数
              poolConfig.setMinIdle(Integer.parseInt(pro.get("redis.minIdle").toString()));//最小闲置个数
              poolConfig.setMaxTotal(Integer.parseInt(pro.get("redis.maxTotal").toString()));//最大连接数
              pool = new JedisPool(poolConfig, pro.getProperty("redis.url"), Integer.parseInt(pro.get("redis.port").toString()));
          }
      
          //获得jedis资源的方法
          public static Jedis getJedis() {
              return pool.getResource();
          }
      
          public static void main(String[] args) {
              Jedis jedis = getJedis();
              System.out.println(jedis);
          }
      }
      
    3. 消息生产者(开启3个线程生产,每3秒生产一个)

      package com.yezi_tool.basic_project.test.redisTest;
      
      import com.yezi_tool.basic_project.commons.utils.JedisPoolUtils;
      import redis.clients.jedis.Jedis;
      
      /**
       * @author Echo_Ye
       * @title 生产者模拟器
       * @description redis模拟生产者
       * @date 2020/8/9 16:26
       * @email echo_yezi@qq.com
       */
      public class MessageProducer extends Thread {
          //消息key,用于区分
          public static final String MESSAGE_KEY = "messageQueue";
          //消息序号
          private volatile Integer count = 0;
          private final int maxCount = 10;
          //睡眠时间为3000毫秒
          private static final long sleepTime = 3000;
      
          /**
           * 发送消息,为确保count不会错乱,加上同步锁
           */
          public synchronized void putMessage() {
              //消息体
              String message = "message" + count++;
              //存入消息
              Jedis jedis = JedisPoolUtils.getJedis();
              Long size = jedis.lpush(MESSAGE_KEY, message);
              System.out.println(Thread.currentThread().getName() + " put message:" + message + "   nowSize:" + size);
          }
      
          /**
           * 线程执行内容
           */
          @Override
          public void run() {
              while (count < maxCount) {
                  try {
                      putMessage();
                      //沉睡
                      Thread.sleep(sleepTime);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              }
          }
      
          /**
           * 开启线程
           */
          public static void runMessageProducer() {
              MessageProducer messageProducer = new MessageProducer();
              Thread producer1 = new Thread(messageProducer, "producer1");
              Thread producer2 = new Thread(messageProducer, "producer2");
              Thread producer3 = new Thread(messageProducer, "producer3");
              producer1.start();
              producer2.start();
              producer3.start();
          }
      
          public static void main(String args[]) {
              runMessageProducer();
          }
      }
      
      

      执行结果如下

      image-20200809181416350

    4. 消息消费者(开启3个线程消费,每2秒消费一个)

      package com.yezi_tool.basic_project.test.redisTest;
      
      import com.yezi_tool.basic_project.commons.utils.JedisPoolUtils;
      import redis.clients.jedis.Jedis;
      
      /**
       * @author Echo_Ye
       * @title 消费者模拟器
       * @description redis模拟消费者
       * @date 2020/8/9 17:34
       * @email echo_yezi@qq.com
       */
      public class MessageConsumer implements Runnable {
          //消息key,用于区分
          public static final String MESSAGE_KEY = "messageQueue";
          //睡眠时间为2000毫秒
          private static final long sleepTime = 2000;
      
          /**
           * 消费消息
           */
          public void consumerMessage() {
              Jedis jedis = JedisPoolUtils.getJedis();
              String message = jedis.rpop(MESSAGE_KEY);
              System.out.println(Thread.currentThread().getName() + " consumer message:" + message);
          }
      
          /**
           * 线程执行内容
           */
          @Override
          public void run() {
              while (true) {
                  try {
                      consumerMessage();
                      //沉睡
                      Thread.sleep(sleepTime);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              }
          }
      
          /**
           * 开启线程
           */
          public static void runMessageConsumer() {
              MessageConsumer messageConsumer = new MessageConsumer();
              Thread consumer1 = new Thread(messageConsumer, "consumer1");
              Thread consumer2 = new Thread(messageConsumer, "consumer2");
              Thread consumer3 = new Thread(messageConsumer, "consumer3");
              consumer1.start();
              consumer2.start();
              consumer3.start();
          }
      
          public static void main(String[] args) {
              runMessageConsumer();
          }
      }
      
      

      执行结果如下,当取不到数据时返回的是null

      image-20200809181523668

    5. 修改消费者,改为堵塞读取,即修改第24行为

      List<String> message = jedis.brpop(0, MESSAGE_KEY);
      

      执行结果如下,取不到数据时开始等待

      image-20200809181646363

    6. 此时重新启动生产者,执行结果如下

      image-20200809181725145

      可以看到消费者继续消费了,直到库存再次为空,便继续等待

    7. 同时启动生产者和消费者,已知消费者速度比生产者快,故让生产者先启动5s

      package com.yezi_tool.basic_project.test.redisTest;
      
      /**
       * @author Echo_Ye
       * @title 消费者/生产者模拟器
       * @description 同时启动生产者和消费者,查看运行结果
       * @date 2020/8/9 18:11
       * @email echo_yezi@qq.com
       */
      public class RedisTest {
          public static void main(String[] args) {
              //启动生产者
              MessageProducer.runMessageProducer();
              try {
                  //沉睡5秒后启动
                  Thread.sleep(5000);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
              //启动消费者
              MessageConsumer.runMessageConsumer();
          }
      }
      

      执行结果如下

      image-20200809181920070

    一切都在计划之中,对吧?那么继续看观察者模式

    2.订阅/发布模式

    2.1.场景预设

    订阅者A告知客户端自己订阅了,生产者B发送消息后需要发送给订阅者。

    2.2.理论基础

    • 订阅者A告知仓库自己订阅,当仓库有变动时,仓库将变动告知订阅者。不关心收到消息前的业务。
    • 发布者B向仓库发布信息。不关心后续业务
    • AB互不知晓对方,由仓库负责消息的接受和发布

    2.3.技术基础

    • 订阅消息:可以订阅一个或多个通道,收集来自这些通道的信息,命令格式为为subscribe channelA channelB

      模拟操作如下

      image-20200810094603338

    • 发布消息:可以向一个通道发布消息,命令格式为publish channel message,模拟操作如下发布三条消息

      image-20200810094741315

    • 此时在订阅端收到三条消息

      image-20200810094900200

    • 其他命令,暂不做演示,有兴趣可以自己玩玩

      • unsubscribe [channel1 [channel1 ...]]:取消订阅,即不再收到来自目标通道的消息,客户端不能取消,只能直接退出。。。
      • psubscribe pattern [pattern ...]:订阅一个或多个符合给定模式的频道,举例如下
        • psubscribe t*st可以匹配到testtastteast等等
        • psubscribe t[ea]st可以匹配到testtast
        • psubscribe t?st可以匹配到tasttbsttcst等等
      • psubscribe pattern [pattern ...]:取消订阅一个或多个符合给定模式的频道,跟上面那货差不多,订阅全部取消后会退出订阅状态

    2.4.补充

    1. 除开publish,其余所有命令的返回值均包含三个
      • 消息类型,即订阅、取消订阅等等
      • 相关的通道名
      • 消息内容,或者剩余订阅数量
    2. 使用psubscribe可以重复订阅一个频道多次:此时该通道(订阅N次)收到消息,则订阅者也会收到多条消息(N条)
    3. 使用subscribe + psubscribe可以重复订阅一个频道多次:此时该通道(订阅N次)收到消息,则订阅者也会收到多条消息(N条),但消息类型会被区分为messagepmessage
    4. punsubscribe无参数时会取消所有订阅
    5. subscribe 与psubscribe互不干扰,因而允许出现同时订阅某通道的情况

    2.4.Java实现

    2.4.1.常规订阅
    1. 消息生产者(发布者),将push操作改为publish操作,并在达到上限时发布终止命令

      package com.yezi_tool.basic_project.test.publishTest;
      
      import com.yezi_tool.basic_project.commons.utils.JedisPoolUtils;
      import redis.clients.jedis.Jedis;
      
      /**
       * @author Echo_Ye
       * @title 发布者模拟器
       * @description 模拟发布者
       * @date 2020/8/10 10:28
       * @email echo_yezi@qq.com
       */
      public class NewMessageProducer extends Thread {
          //消息key,用于区分
          public static final String MESSAGE_KEY = "messageQueue";
          //消息序号
          private volatile Integer count = 0;
          private final int maxCount = 10;
          //睡眠时间为3000毫秒
          private static final long sleepTime = 3000;
          //结束程序的消息
          public static final String EXIT_COMMAND = "exit";
      
          /**
           * 发送消息,为确保count不会错乱,加上同步锁
           */
          public synchronized void putMessage() {
              //消息体
              String message = "message" + count++;
              //达到上限时改为终止命令
              if (count == maxCount) {
                  message = EXIT_COMMAND;
              }
              //存入消息
              Jedis jedis = JedisPoolUtils.getJedis();
              Long size = jedis.publish(MESSAGE_KEY, message);
              System.out.println(Thread.currentThread().getName() + " publish message:" + message + "   receiverNum:" + size);
          }
      
          /**
           * 线程执行内容
           */
          @Override
          public void run() {
              while (count < maxCount) {
                  try {
                      putMessage();
                      //沉睡
                      Thread.sleep(sleepTime);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              }
          }
      
          /**
           * 开启线程
           */
          public static void runMessageProducer() {
              NewMessageProducer messageProducer = new NewMessageProducer();
              Thread publisher1 = new Thread(messageProducer, "publisher1");
              Thread publisher2 = new Thread(messageProducer, "publisher2");
              Thread publisher3 = new Thread(messageProducer, "publisher3");
              publisher1.start();
              publisher2.start();
              publisher3.start();
          }
      
          public static void main(String args[]) {
              runMessageProducer();
          }
      }
      
      
    2. 消息消费者(订阅者),接收到终止命令时退出

      package com.yezi_tool.basic_project.test.publishTest;
      
      import com.yezi_tool.basic_project.commons.utils.JedisPoolUtils;
      import redis.clients.jedis.Jedis;
      import redis.clients.jedis.JedisPubSub;
      
      
      /**
       * @title 订阅者模拟器
       * @description 订阅者模拟器
       * @author Echo_Ye
       * @date 2020/8/10 10:43
       * @email echo_yezi@qq.com
       */
      public class NewMessageConsumer implements Runnable {
          //消息key,用于区分
          public static final String MESSAGE_KEY = "messageQueue";
          //处理接收消息
          private MyJedisPubSub myJedisPubSub = new MyJedisPubSub();
      
          /**
           * 消费消息
           */
          public void consumerMessage() {
              Jedis jedis = JedisPoolUtils.getJedis();
              jedis.subscribe(myJedisPubSub, MESSAGE_KEY);
          }
      
          /**
           * 线程执行内容
           */
          @Override
          public void run() {
              while (true) {
                  consumerMessage();
              }
          }
      
          /**
           * 开启线程
           */
          public static void runMessageConsumer() {
              NewMessageConsumer messageConsumer = new NewMessageConsumer();
              Thread subscriber1 = new Thread(messageConsumer, "subscriber1");
              Thread subscriber2 = new Thread(messageConsumer, "subscriber2");
              Thread subscriber3 = new Thread(messageConsumer, "subscriber3");
              subscriber1.start();
              subscriber2.start();
              subscriber3.start();
          }
      
          public static void main(String[] args) {
              runMessageConsumer();
          }
      
          /**
           * 继承JedisPubSub,重写接收消息的方法
           */
          class MyJedisPubSub extends JedisPubSub {
              /**
               * JedisPubSub类是一个没有抽象方法的抽象类,里面方法都是一些空实现
               * 所以可以选择需要的方法覆盖,这儿使用的是SUBSCRIBE指令,所以覆盖了onMessage
               * 如果使用PSUBSCRIBE指令,则覆盖onPMessage方法
               * 当然也可以选择BinaryJedisPubSub,同样是抽象类,但方法参数为byte[]
               **/
              @Override
              public void onMessage(String channel, String message) {
                  System.out.println(Thread.currentThread().getName() + " channel:" + channel + "  message:" + message);
                  //接收到exit消息后退出
                  if (NewMessageProducer.EXIT_COMMAND.equals(message)) {
                      System.exit(0);
                  }
              }
          }
      }
      
      
    3. 同时启动订阅者和发布者,让发布者先启动5s

      package com.yezi_tool.basic_project.test.publishTest;
      
      /**
       * @author Echo_Ye
       * @title 订阅/发布模拟器
       * @description 同时启动订阅者和发布者,查看运行结果
       * @date 2020/8/10 10:43
       * @email echo_yezi@qq.com
       */
      public class NewRedisTest {
          public static void main(String[] args) {
              //启动生产者
              NewMessageProducer.runMessageProducer();
              try {
                  //沉睡5秒后启动
                  Thread.sleep(5000);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
              //启动消费者
              NewMessageConsumer.runMessageConsumer();
          }
      }
      
      

      执行结果如下

      image-20200810104632380

    2.4.2.按规则订阅和发布
    1. 修改发布者,发布命令修改为

          public synchronized void putMessage() {
              //消息体
              String message = "message" + count++;
              //达到上限时改为终止命令
              if (count == maxCount) {
                  message = EXIT_COMMAND;
              }
              //存入消息
              Jedis jedis = JedisPoolUtils.getJedis();
      //        Long size = jedis.publish(MESSAGE_KEY, message);
              Long size = jedis.publish(MESSAGE_KEY + "_" + Thread.currentThread().getName(), message);
              System.out.println(Thread.currentThread().getName() + " publish message:" + message + "   receiverNum:" + size);
          }
      
    2. 修改订阅者,订阅命令修改为

          public void consumerMessage() {
              Jedis jedis = JedisPoolUtils.getJedis();
      //        jedis.subscribe(myJedisPubSub, MESSAGE_KEY);
              jedis.psubscribe(myJedisPubSub, MESSAGE_KEY + "*");
          }
      
    3. 修改订阅者消息处理器

      /**
       * 继承JedisPubSub,重写接收消息的方法
       */
      class MyJedisPubSub extends JedisPubSub {
          @Override
          public void onPMessage(String pattern, String channel, String message) {
              System.out.println(Thread.currentThread().getName() + " channel:" + channel + "  message:" + message + "  pattern:" + pattern);
              //接收到exit消息后退出
              if (NewMessageProducer.EXIT_COMMAND.equals(message)) {
                  System.exit(0);
              }
          }
      }
      
    4. 执行结果如下

      image-20200810110348373

    可以看到我订阅了messageQueue*,则收到了来自messageQueue_publisher1messageQueue_publisher2messageQueue_publisher3的消息

    3.补充

    • 堵塞读取(brpop)后线程会处于堵塞状态,直至读取到数据或者超时,才会停止线程

    • 订阅后线程也会处于堵塞状态,直到取消所有订阅,或者关闭该线程

    • psubscribe订阅的通道,仅能通过punsubscribe取消订阅,且遵循严格字符串匹配规则,不会将其中通配符展开

      如,punsubscribe *不会退订channel.*,而必须使用punsubscribe channel.*

    • 请管理好线程,线程的不当管理会带来极大性能消耗,如果你也想cpu烤肉可以当我没说

    4.传送门

    没啥好传送的,网上的资料都很零碎,建议多查查,从各个方面思考

    BB两句

    我一直以为订阅/发布和生产/消费是同一个模式。。。后来越想越不对劲。。。


    作者:Echo_Ye

    WX:Echo_YeZ

    EMAIL :echo_yezi@qq.com

    个人站点:在搭了在搭了。。。(右键 - 新建文件夹)

    展开全文
  • 文章目录案例:文章推送观察者模式观察者模式的运作流程观察者模式解决的问题观察者模式大显身手总结要点应用场景生产者-消费者模型 VS 观察者模式完整代码 案例:文章推送 为了方便举例,我提出以下场景。 假设我...


    案例:文章推送

    假设我是一个科幻小说爱好者,我维护着一个叫做ScienceFictionPusher的公众号,定期向豆瓣、知乎等平台推送那些我觉得有趣的科幻小说,于是为了方便管理,我的推送程序是这样的逻辑
    在这里插入图片描述

    class ScienceFictionPusher 
    {
    public:
    	//推送内容
        void newPush()
        {
        	//分别向各大平台推送内容
            _zhihu->update(_url, _title, _desc);
            _douban->update(_url, _title, _desc);
        }
    	
    	//设置新的内容
        void setNewFiction(const std::string& url, const std::string& title, const std::string& desc)
        {
            _url = url;
            _title = title;-
            _desc = desc;
            newPush();
        }
    
    private:
        std::string _url;    //小说链接
        std::string _title;  //小说名
        std::string _desc;   //小说简介
        Douban* _douban;
        Zhihu* _zhihu;
    };
    

    上面这种实现方式咋一看没什么问题,甚至在某些地方处理的还不错,因为我们将内容的更新从平台主动的拉取变为了公众号的主动推送,大大减少了空转时间。因此,我们将代码投入使用

    随着粉丝越来越多,公众号的名气也越来越大,于是乎越来越多的平台开始邀请我的专栏入驻,但是此时就出现了问题
    在这里插入图片描述
    如果采用上面这种模式的话,当有大量的平台时,代码会是这样的,存在大量的冗余,可读性也极差

    void newPush()
    {
     	//分别向各大平台推送内容
         _zhihu->update(_url, _title, _desc);
         _douban->update(_url, _title, _desc);
         _wechat->update(_url, _title, _desc);
         _uc->update(_url, _title, _desc);
         _tiktok->update(_url, _title, _desc);
         _bilibili->update(_url, _title, _desc);
         _baidu->update(_url, _title, _desc);
         _csdn->update(_url, _title, _desc);
         ...........................
     }
    

    由于公众号的经营也存在波动,当流量大的时候我们会有新增的平台,当某个平台流量小的时候我们也不会再去维护,所以平台的数量是时刻变化的,那这样的代码就意味着我们需要时刻去程序中修改,无法动态的增加、删除,效率极低。

    那有什么好的解决方法吗?这就到了 观察者模式出场的时候


    观察者模式

    观察者模式也叫做发布订阅模式,它定义了对象之间的一对多依赖,当一个对象改变状态的时候,它的所有依赖着都会收到通知并自动更新。

    为了方便举例,这里我们将发布内容的对象称为主题接收内容的对象称为观察者

    观察者模式的运作流程

    在这里插入图片描述

    此时对象C也想要获取内容,所以它告诉主题他想要注册成为观察者
    在这里插入图片描述
    在这里插入图片描述
    由于主题发布的内容质量逐渐降低,对象A不再需要订阅,此时它请求注销主题
    在这里插入图片描述
    在这里插入图片描述
    从上面我们可以看到,主题主要做了三件事,注册、删除、通知观察者。而观察者所做的只是被动的接受主题提供的数据

    观察者模式解决的问题

    讲了这么多,其实观察者模式最主要的作用就是让主题和观察者松耦合:即这两个对象虽然互相可以交互,但是它们都不清楚彼此的细节

    主题只知道观察者实现了Observer接口,它并不需要知道观察者的具体类是谁,也不需要了解它究竟实现了什么,它只需要调用观察者的update将数据更新过去即可。

    同样的,因为主题依赖的只是实现了Observer接口的对象列表,所以无论我们是对观察者增加还是删除,都不会对主题造成影响,主题也不需要为了兼容这些观察者而去修改代码。

    甚至我们还可以在其他地方独立的复用主题和观察者,例如我们新增一个新的主题,又或者是新增一个观察者,由于二者并非紧耦合,所以不会有任何的影响。

    总结一下就是,这种设计将对象之间的互相依赖降到了最低,因此我们的程序具有弹性,能够应对各种变化。


    观察者模式大显身手

    回到上面的问题,当我们的公众号发布新内容的时候,我们会将这些内容推送到所有的入驻平台中,这正好就符合上面所说的观察者模式的场景。此时公众号充当主题对象,而平台充当观察者。

    此时完整的关系图如下
    在这里插入图片描述
    根据上面所提到的内容,我们抽象出具体的主题接口和观察者接口。为了方便使用不同语言的读者阅读,我会尽量少用C++的特性,如果还是有不理解的可以私信或者评论区留言。

    主题接口只需要提供必须的注册、删除、发布即可

    class Subject
    {
    public:
        virtual ~Subject() = default;
        virtual void registerObserver(Observer*) = 0;   //注册观察者
        virtual void removeObserver(Observer*) = 0;     //移除观察者
        virtual void notifyObservers() = 0;             //通知所有观察者
    };
    

    观察者被动等待主题的数据,所以我们也只提供一个更新接口供主题更新数据

    class Observer
    {
    public:
        virtual ~Observer() = default;
        virtual void update(const std::string& url, const std::string& title, const std::string& desc) = 0;      //更新数据
    };
    

    考虑到每个平台获取到新内容都必定要将其展示出来,而每个平台展示的方式又有所不同,所以我们将其再抽象为一个接口类,观察者需要继承这个类并实现自己的展示方法

    class DisplayElement
    {
    public:
        virtual ~DisplayElement() = default;
        virtual void display() = 0;     //显示数据
    };
    

    下面就开始具体实例的实现吧

    为了保证不会对同一平台重复发送,以及后续可能会对某些平台单独推送内容,我们使用一个哈希表来存储所有入驻的平台

    //主题派生子类
    class ScienceFictionPusher : public Subject
    {
    public:
    	//增加观察者
        void registerObserver(Observer* observer)
        {
            _observers.insert(observer);
        }
    	
    	//删除观察者
        void removeObserver(Observer* observer)
        {
            _observers.erase(observer);
        }
    	
    	//向所有平台推送内容
        void notifyObservers()
        {
            for(const auto& ob : _observers)
            {
                ob->update(_url, _title, _desc);
            }
        }
    	
    	//推送新内容
        void newPush()
        {
            notifyObservers();
        }
    	
    	//设置新内容,当有新内容发布的时候,就会自动推送给所有的平台
        void setNewFiction(const std::string& url, const std::string& title, const std::string& desc)
        {
            _url = url;
            _title = title;
            _desc = desc;
            newPush();
        }
    
    private:
        std::string _url;    //小说链接
        std::string _title;  //小说名
        std::string _desc;   //小说简介
        std::unordered_set<Observer*> _observers;   //入驻的平台
    };
    

    当有新的平台想要入驻的时候,它只需要继承观察者类并实现update接口即可,同时由于我们接收新内容后还需要在自身平台中显示,所以还需要继承发布内容类,并实现display接口

    为了方便注册和删除观察者,我们需要保存一个指向主题的指针

    //观察者派生子类
    class Zhihu : public Observer, public DisplayElement
    {
    public:
        Zhihu(Subject* ScienceFictionPusher)
            : _ScienceFictionPusher(ScienceFictionPusher)
        {
            _ScienceFictionPusher->registerObserver(this);
        }
    
        ~Zhihu()
        {
            _ScienceFictionPusher->removeObserver(this);
        }
    
    	//实现更新接口,让主题主动推送数据
        void update(const std::string& url, const std::string& title, const std::string& desc)
        {
            _url = url;
            _title = title;
            _desc = desc;
    
            display();
        }
    	
    	//在平台中显示推送的内容
        void display()
        {
            std::cout << "知乎每日书籍推荐:" << std::endl;
            std::cout << "链接:" << _url << std::endl;
            std::cout << "标题:" << _title << std::endl;
            std::cout << "简介:" << _desc << "\n" <<std::endl; 
        }
        
    private:
        std::string _url;    //小说链接
        std::string _title;  //小说名
        std::string _desc;   //小说简介
        Subject* _ScienceFictionPusher; //主题对象,方便注册和删除
    };
    

    其他的观察者也类似,为了节省篇幅这里就不多写了,下面写个简单的程序测试一下

    int main()
    {
        ScienceFictionPusher* _subject = new ScienceFictionPusher;
        Douban* douban = new Douban(_subject);
        Zhihu* zhihu = new Zhihu(_subject);
    
        _subject->setNewFiction("www.aaaaaaa.com", "三体", "作品讲述了地球人类文明和三体文明的信息交流、生死搏杀及两个文明在宇宙中的兴衰历程。");
        _subject->setNewFiction("www.bbbbbbb.com", "球形闪电", "描述了一个历经球状闪电的男主角对其历尽艰辛的研究历程,向我们展现了一个独特、神秘而离奇的世界");
    
        delete zhihu;
        delete douban;
        delete _subject;
        
        return 0;
    }
    

    我们添加了知乎和豆瓣两个观察者,并且连续推送了三体和球形闪电这两条内容
    在这里插入图片描述
    可以看到,测试结果没有问题


    总结

    要点

    • 观察者模式定义了对象之间一对多的关系
    • 观察者模式使得我们可以独立地改变主题与观察者,从而使二者之间的依赖关系达致松耦合。
    • 主题发送通知时,需要遍历观察者,因此其知道观察者的存在
    • 观察者自己决定是否需要订阅通知,主题对象对此一无所知。

    应用场景

    观察者模式应该可以说是应用最多、影响最广的模式之一,它通常应用于游戏引擎、GUI、邮件订阅等场景

    场景1 :游戏中的事件监控
    在这里插入图片描述

    例如我们设计了一个RPG游戏,当我们的角色移动到敌人的视野范围时,周围的敌人就会向角色移动并且发起攻击。当我们移动到陷阱的触发位置时,陷阱就会对我们造成伤害。当我们移动到泉水时,泉水又会为角色提供治疗或者BUFF。

    在上面的例子中,我们的角色就是一个主题,而泉水、陷阱、敌人这些就是观察者,当我们做出了某种举动的时候,就会通知它们这些事件的发生,它们就会做出一个具体的响应。这样就能够保证事件实时的同步,以及方便我们进行拓展,后续向增加新事件例如减速的泥潭等内容只需要将其注册为观察者并实现逻辑即可。

    场景2:GUI界面的事件侦听
    在这里插入图片描述
    在GUI界面中,通常有着许多的选项, 而在这些选项背后,通常又有多个负责不同功能的侦听者等待我们的结果,当我们按下这个按钮的时候,就会通知负责这一功能的一系列侦听者响应号召,执行它们各自的工作,这也是一种观察者模式


    生产者-消费者模型 VS 观察者模式

    说到数据的生产和发布、解耦合这两方面,那就难免要提到生产者消费者模型,下面给出它们两个的对比图。

    如果不了解生产者消费者模型的可以参考我的往期博客
    操作系统:生产者消费者模型的两种实现(C++)
    在这里插入图片描述
    在这里插入图片描述
    相同点

    • 主要作用都是解耦合
    • 两者都是行为模式,本质上都是发布-消费两个行为

    不同点

    • 观察者模式是一对多,一条消息可以被多个观察者使用
    • 生产者-消费者模型是多对多的,并且一条消息只能被一个消费者使用
    • 观察者模式可以同步实现,也可以异步实现
    • 生产者消费者模式依赖于交易场所,只能异步实现
    • 观察者模式中主题知道观察者的存在,因为它需要遍历订阅列表发送通知,因此两者之间还是存在微弱的耦合关系
    • 生产者和消费者借助交易场所(中间队列),它们只需要往队列中生成/消费数据,因此不需要知道对方的存在,属于完全解耦

    完整代码及文档

    如果有需要完整代码或者markdown文档的同学可以点击下面的github链接
    github

    展开全文
  • 发布/订阅,类似设计模式中的观察者模式。 redis可以作为pub/sub的服务端。订阅得通过subscribepsubscribe命令向redis server订阅消息类型,redis将消息类型称为channel。 当发布者通过publish命令向redis ...
  • 可以想 zookeeper 等,有时系统需要定时(可插拔)接收或者监听其他服务的动态,这类需求经常见到,那么观察者模式就是做这个的: 一个软件系统里面包含了各种对象,就像一片欣欣向荣的森林充满了各种生物一样。...
  • 作为发布 / 订阅系统实现一个微服务级系统间的观察者模式; 连接流计算任务数据; 用于将消息广播给大量接收者。 简单的说,我们在单体应用里面需要用队列解决的问题,在分布式系统中大多都可以用消息队列来解决...
  • work queues也成为task queues,任务模型。当消息处理比较耗时的时候,可能生产消息的速度远远大于消费速度,长此以往,消息就会堆积,...这种模型和我们之前提到的hello word直连简单模型非常相似,只是消费者从一个.
  • 在今天我刚阅读了《kafka权威指南》的第一章,关于介绍kafka这个消息发布订阅系统的概念。在之前的一份实习中,我曾经接触到了kafka,但是我一知半解,也没有深入的了解为什么会有kafka的存在。恰巧前几天我在市...
  • **观察者模式(Observer Pattern)又称为`发布/订阅(Publish/Subscribe)模式,`在对象之间定义了一对多的依赖关系,当一个对象改变状态,依赖它的对象会收到通知并自动更新.** - 观察者模式无外乎2个操作,观察者订阅...
  • 生产者-消费者模式:用流水线思想提高效率1. 生产者 - 消费者模式的优点2. 支持批量执行以提升性能3. 支持分阶段提交以提升性能4. 总结   前面我们在《Worker Thread模式:工作线程池》中讲到,Worker Thread 模式...
  • 在此,发生改变的对象称为观察目标,而被通知的对象称为观察者,一个观察目标可以对应多个观察者,而且这些观察者之间没有相互联系,可以根据需要增加删除观察者,使得系统更易于扩展,当对象间存在一对多关系时,...
  • Java中观察者模式与委托的对比

    千次阅读 多人点赞 2022-05-10 19:19:44
    Java中观察者模式与委托的对比
  • 观察者模式的别名包括发布-订阅(Publish/Subscribe)模式、模型-视图(Model/View)模式、源-监听器(Source/Listener)模式 或从属者(Dependents)模式。观察者模式是一种对象行为型模式。 在观察者模式中,发生...
  • 前两个的模型结构不同,在发布订阅模型中多了一个X(exchange),exchange是一个交换机,生产者不是直接将消息发送给队列,而是先发送给交换机。消费者可以通过队列去订阅这个交换机,每个消费者对应于自己的一个...
  • EventDispatcher在Nacos中是一个事件发布订阅的类,也就是我们经常使用的Java设计模式——事件驱动模式(观察者模式) 一般发布订阅主要有三个角色 事件: 表示某些类型的事件动作,例如Nacos中的 本地数据发生...
  • 观察者模式也叫发布订阅模式,就是在对象之间定义一个一对多的依赖,当一个对象状态改变的时候,所有依赖的对象都会自动收到通知。 一般情况来说,被依赖的对象叫做被观察者,依赖的对象叫做观察者。 行为型的设计...
  • 分布式通信:发布订阅

    万次阅读 2021-12-25 13:22:31
    Kafka 发布订阅原理及工作机制分区和消费组的原理和作用BrokerConsumer发布订阅实践应用知识扩展:观察者模式和发布订阅模式的区别是什么?总结 前言 分布式通信中的远程调用的核心是在网络服务层封装了通信协议、...
  • 设计模式之【观察者模式】

    千次阅读 2022-04-04 12:05:21
    什么是观察者模式?观察者模式有什么优缺点?在开源代码中是如何使用的?让我们一起学习设计模式之【观察者模式】。
  •  (2) 观察者模式在观察目标和观察者之间建立一个抽象的耦合。  观察目标只需要维持一个抽象观察者的集合,无须了解其具体观察者。  由于观察目标和观察者没有紧密地耦合在一起,因此它们可以属于不同的抽象化层次...
  • 生产者-消费者模式在编程领域的应用也非常广泛,前面我们曾经提到,Java线程池本质上就是用生产者-消费者模式实现的,所以每当使用线程池的时候,其实就是在应用生产者-消费者模式。 当然,除了在线程池中的应用,...
  • Python观察者模式

    2020-12-04 10:47:25
    标签: python 设计模式 观察...HF里面对设计模式讲的确实很通俗,按照他的说,观察者模式可以用报纸出版商报纸订阅者之间的关系来描述:报社出版报纸,可以将它看做一个生产者,报纸订阅订阅报纸,可以将他看做...
  • 生产者-消费者模式在编程领域的应用也非常广泛,前面我们曾经提到,Java线程池本质上就是用生产者-消费者模式实现的,所以每当使用线程池的时候,其实就是在应用生产者-消费者模式。 当然,除了在线程池...
  • 设计模式-观察者模式

    2021-12-21 11:39:02
    消息队列(MQ),一种能实现生产者消费者单向通信的通信模型,这也是现在常用的主流中间件。 常见有 RabbitMQ、ActiveMQ、Kafka等 他们的特点也有很多 比如 解偶、异步、广播、削峰 等等多种优势特点。 在设计模式...
  • 文章目录设计模式: Observer 观察者模式简介从 MVC 到 MVVM参考完整示例代码正文场景模式结构代码示例Subject 可订阅对象/主题Observer 观察者测试代码结语 简介 目的 创建型 结构型 行为型 类 Factory ...
  • 当一个对象(被观察者/发布者)的状态发生改变时,所有依赖它的对象都将得到通知并更新。又有别名为发布-订阅(publish-subscribe)。 1.2应用场景 一个抽象模型有两个方面,其中一个方面依赖于另一方面。将这二者...
  • 最近在看springcloud的熔断机制的实现,发现底层使用的rxjava实现,就看了下rxjava的使用,发现rxjava使用可也便捷实现前面讲解的定时生产消费。 二、rxjava版生产消费实现 在简单抽象下要实现的功能,...
  • 阿里P6+面试:介绍下观察者模式?

    万次阅读 多人点赞 2021-06-03 00:03:19
    消息队列(MQ),一种能实现生产者消费者单向通信的通信模型,这也是现在常用的主流中间件。常见有 RabbitMQ、ActiveMQ、Kafka等 他们的特点也有很多 比如 解偶、异步、广播、削峰 等等多种优势特点。 在设计模式...
  • 设计模式 | 观察者模式及典型应用

    千次阅读 2018-10-24 01:05:45
    本文主要内容: ...JDK 委托事件模型DEM中的观察者模式 Spring ApplicationContext 事件机制中的观察者模式 观察者模式 观察者模式是设计模式中的 “超级模式”,其应用随处可见,我们以微信公众号...
  • 胡说八道设计模式—观察者模式

    万次阅读 2020-12-03 21:46:04
    观察者模式(Observer Design Pattern)也被称为发布订阅模式(Publish-Subscribe Design Pattern) 在对象之间定义一个一对多的依赖,当一个对象状态改变的时候,所有依赖的对象都会自动收到通知

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 9,439
精华内容 3,775
关键字:

观察者模型和发布订阅模型生产者消费者