精华内容
下载资源
问答
  • EventBus是Guava的事件处理机制,是设计模式中的观察者模式(生产/消费者编程模型)的优雅实现。对于事件监听和发布订阅模式,EventBus是一个非常优雅和简单解决方案,我们不用创建复杂的类和接口层次结构。  ...

    EventBus是Guava的事件处理机制,是设计模式中的观察者模式(生产/消费者编程模型)的优雅实现。对于事件监听和发布订阅模式,EventBus是一个非常优雅和简单解决方案,我们不用创建复杂的类和接口层次结构。

      Observer模式是比较常用的设计模式之一,虽然有时候在具体代码里,它不一定叫这个名字,比如改头换面叫个Listener,但模式就是这个模式。手工实现一个Observer也不是多复杂的一件事,只是因为这个设计模式实在太常用了,Java就把它放到了JDK里面:Observable和Observer,从JDK 1.0里,它们就一直在那里。从某种程度上说,它简化了Observer模式的开发,至少我们不用再手工维护自己的Observer列表了。不过,如前所述,JDK里的Observer从1.0就在那里了,直到Java 7,它都没有什么改变,就连通知的参数还是Object类型。要知道,Java 5就已经泛型了。Java 5是一次大规模的语法调整,许多程序库从那开始重新设计了API,使其更简洁易用。当然,那些不做应对的程序库,多半也就过时了。这也就是这里要讨论知识更新的原因所在。今天,对于普通的应用,如果要使用Observer模式该如何做呢?答案是Guava的EventBus。

      EventBus基本用法:

      使用Guava之后, 如果要订阅消息, 就不用再继承指定的接口, 只需要在指定的方法上加上@Subscribe注解即可。代码如下:

      消息封装类:

    复制代码

    public class TestEvent {
        private final int message;
        public TestEvent(int message) {        
            this.message = message;
            System.out.println("event message:"+message);
        }
        public int getMessage() {
            return message;
        }
    }

    复制代码

      消息接受类:

    复制代码

    public class EventListener {
        public int lastMessage = 0;
    
        @Subscribe
        public void listen(TestEvent event) {
            lastMessage = event.getMessage();
            System.out.println("Message:"+lastMessage);
        }
    
        public int getLastMessage() {      
            return lastMessage;
        }
    }

    复制代码

      测试类及输出结果:

    复制代码

    public class TestEventBus {
        @Test
        public void testReceiveEvent() throws Exception {
    
            EventBus eventBus = new EventBus("test");
            EventListener listener = new EventListener();
    
            eventBus.register(listener);
    
            eventBus.post(new TestEvent(200));
            eventBus.post(new TestEvent(300));
            eventBus.post(new TestEvent(400));
    
            System.out.println("LastMessage:"+listener.getLastMessage());
            ;
        }
    }
    
    //输出信息
    event message:200
    Message:200
    event message:300
    Message:300
    event message:400
    Message:400
    LastMessage:400

    复制代码

       MultiListener的使用:

      只需要在要订阅消息的方法上加上@Subscribe注解即可实现对多个消息的订阅,代码如下:

    复制代码

    public class MultipleListener {
        public Integer lastInteger;  
        public Long lastLong;  
       
        @Subscribe  
        public void listenInteger(Integer event) {  
            lastInteger = event; 
            System.out.println("event Integer:"+lastInteger);
        }  
       
        @Subscribe  
        public void listenLong(Long event) {  
            lastLong = event; 
            System.out.println("event Long:"+lastLong);
        }  
       
        public Integer getLastInteger() {  
            return lastInteger;  
        }  
       
        public Long getLastLong() {  
            return lastLong;  
        }  
    }

    复制代码

      测试类:

    复制代码

    public class TestMultipleEvents {
        @Test  
        public void testMultipleEvents() throws Exception {  
           
            EventBus eventBus = new EventBus("test");  
            MultipleListener multiListener = new MultipleListener();  
           
            eventBus.register(multiListener);  
           
            eventBus.post(new Integer(100));
            eventBus.post(new Integer(200));  
            eventBus.post(new Integer(300));  
            eventBus.post(new Long(800)); 
            eventBus.post(new Long(800990));  
            eventBus.post(new Long(800882934));  
           
            System.out.println("LastInteger:"+multiListener.getLastInteger());
            System.out.println("LastLong:"+multiListener.getLastLong());
        }   
    }
    
    //输出信息
    event Integer:100
    event Integer:200
    event Integer:300
    event Long:800
    event Long:800990
    event Long:800882934
    LastInteger:300
    LastLong:800882934

    复制代码

      Dead Event:

      如果EventBus发送的消息都不是订阅者关心的称之为Dead Event。实例如下:

    复制代码

    public class DeadEventListener {
        boolean notDelivered = false;  
           
        @Subscribe  
        public void listen(DeadEvent event) {  
            
            notDelivered = true;  
        }  
       
        public boolean isNotDelivered() {  
            return notDelivered;  
        }  
    }

    复制代码

      测试类:

    复制代码

    public class TestDeadEventListeners {
        @Test  
        public void testDeadEventListeners() throws Exception {  
           
            EventBus eventBus = new EventBus("test");               
            DeadEventListener deadEventListener = new DeadEventListener();  
            eventBus.register(deadEventListener);  
    
            eventBus.post(new TestEvent(200));         
            eventBus.post(new TestEvent(300));        
           
            System.out.println("deadEvent:"+deadEventListener.isNotDelivered());
    
        }  
    }
    
    //输出信息
    event message:200
    event message:300
    deadEvent:true

    复制代码

      说明:如果没有消息订阅者监听消息, EventBus将发送DeadEvent消息,这时我们可以通过log的方式来记录这种状态。

      Event的继承:

      如果Listener A监听Event A, 而Event A有一个子类Event B, 此时Listener A将同时接收Event A和B消息,实例如下:

      Listener 类:

    复制代码

    public class NumberListener {  
           
        private Number lastMessage;  
       
        @Subscribe  
        public void listen(Number integer) {  
            lastMessage = integer; 
            System.out.println("Message:"+lastMessage);
        }  
       
        public Number getLastMessage() {  
            return lastMessage;  
        }  
    }  
    
    public class IntegerListener {  
           
        private Integer lastMessage;  
       
        @Subscribe  
        public void listen(Integer integer) {  
            lastMessage = integer; 
            System.out.println("Message:"+lastMessage);
        }  
       
        public Integer getLastMessage() {  
            return lastMessage;  
        }  
    }  

    复制代码

      测试类:

    复制代码

    public class TestEventsFromSubclass {
        @Test  
        public void testEventsFromSubclass() throws Exception {  
           
            EventBus eventBus = new EventBus("test");  
            IntegerListener integerListener = new IntegerListener();  
            NumberListener numberListener = new NumberListener();  
            eventBus.register(integerListener);  
            eventBus.register(numberListener);  
           
            eventBus.post(new Integer(100));  
           
            System.out.println("integerListener message:"+integerListener.getLastMessage());
            System.out.println("numberListener message:"+numberListener.getLastMessage());
                  
            eventBus.post(new Long(200L));  
           
            System.out.println("integerListener message:"+integerListener.getLastMessage());
            System.out.println("numberListener message:"+numberListener.getLastMessage());        
        }  
    }
    
    //输出类
    Message:100
    Message:100
    integerListener message:100
    numberListener message:100
    Message:200
    integerListener message:100
    numberListener message:200

    复制代码

      说明:在这个方法中,我们看到第一个事件(新的整数(100))是收到两个听众,但第二个(新长(200 l))只能到达NumberListener作为整数一不是创建这种类型的事件。可以使用此功能来创建更通用的监听器监听一个广泛的事件和更详细的具体的特殊的事件。

       一个综合实例:

    复制代码

    public class UserThread extends Thread {
        private Socket connection;
        private EventBus channel;
        private BufferedReader in;
        private PrintWriter out;
    
        public UserThread(Socket connection, EventBus channel) {
            this.connection = connection;
            this.channel = channel;
            try {
                in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
                out = new PrintWriter(connection.getOutputStream(), true);
            } catch (IOException e) {
                e.printStackTrace();
                System.exit(1);
            }
        }
    
        @Subscribe
        public void recieveMessage(String message) {
            if (out != null) {
                out.println(message);
                System.out.println("recieveMessage:"+message);
            }
        }
    
        @Override
        public void run() {
            try {
                String input;
                while ((input = in.readLine()) != null) {
                    channel.post(input);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            
            //reached eof
            channel.unregister(this);
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
            in = null;
            out = null;
        }
    }

    复制代码

    复制代码

    mport java.io.IOException;
    import java.net.ServerSocket;
    import java.net.Socket;
    
    import com.google.common.eventbus.EventBus;
    
    public class EventBusChat {
        public static void main(String[] args) {
            EventBus channel = new EventBus();
            ServerSocket socket;
            try {
                socket = new ServerSocket(4444);
                while (true) {
                    Socket connection = socket.accept();
                    UserThread newUser = new UserThread(connection, channel);
                    channel.register(newUser);
                    newUser.start();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    复制代码

      说明:用telnet命令登录:telnet 127.0.0.1 4444 ,如果你连接多个实例你会看到任何消息发送被传送到其他实例。

    展开全文
  • 感觉上类似设计模式中的观察者模式前面的简单模式Work模式。生产者的每一条消息都是被多个消费者中的一个消费掉。现在的publish/subscribe发布订阅模式。生产者的一条消息,将被多个消费者接收。RabbitMQ消息传递...

    RabbitMQ的使用二_Java Client方式使用发布/订阅模式

    1.发布订阅模式:使用了Fanout交换机。它是将它接收到的所有消息广播到它知道的所有队列。感觉上类似设计模式中的观察者模式

    前面的简单模式和Work模式。生产者的每一条消息都是被多个消费者中的一个消费掉。现在的publish/subscribe发布订阅模式。生产者的一条消息,将被多个消费者接收。

    RabbitMQ消息传递模型的核心思想是:生产者永远不会直接向队列发送任何消息。实际上,生产者常常根本不知道消息是否会被传递到任何队列。相反,生产者只能将消息发送到交换机。

    交换机的职责:一方面,它接收来自生产者的消息,另一方面,它将消息推入队列。交换机必须明确地知道如何处理接收到的消息。

    b0dff8614c599e2e9b8bbb52d068a271.pngP:表示生产者  X:表示交换机   交换机连着两个队列

    交换机有direct、topic、 headers、fanout四种

    发布订阅模式使用的是fanout交换机

    channel.exchangeDeclare(交换机的名称, "fanout");

    前面的例子中,使用“”来表示交换机。这是使用的默认的交换机。

    channel.basicPublish("", WORK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, sendMsg.getBytes());

    消息将使用routingKey指定的名称路由到队列

    2.临时队列

    临时队列:1.有一个随机的队列名称。

    2.一旦断开使用者的连接,队列应该被自动删除。

    在Java客户端中,当我们不为queueDeclare()提供参数时,我们会创建一个非持久的、排他的、自动删除队列,并生成一个名称

    String queueName = channel.queueDeclare().getQueue();

    交换机和队列需要绑定起来,可通过如下方式:

    channel.queueBind(queueName, 交换机的名称, "");

    交换机的图示:

    2ed9e8eced9a91ce5d6e04f9987b1155.png

    a6081da8d946c2fdb26c83bf6da4b020.png

    代码实现如下:

    创建一个生产者。

    创建一个Fanout交换机

    创建2个临时队列,然后和Fanout交换机绑定起来,用户验证发布订阅模式。

    创建1个普通队列。给这个队列添加2个消费者。用来实现WORK模式

    1.创建一个生产者

    8f900a89c6347c561fdf2122f13be562.png

    961ddebeb323a10fe0623af514929fc1.png

    public classFanoutExchangeSender {//1.声明一个交换机

    private static final String FANOUT_EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throwsIOException, TimeoutException {//1.获取一个链接

    Connection rabbitMQConnections =RabbitMQConnectionFactory.getRabbitMQConnections();//2.获取一个通道

    Channel channel =rabbitMQConnections.createChannel();//3.声明一个Fanout交换机

    channel.exchangeDeclare(FANOUT_EXCHANGE_NAME, "fanout");//发送数据到交换机中

    for (int i = 1; i <= 100; i++) {

    String msg= new Date() + "小河流水哗啦啦============>" +i;//发送消息//第一个参数表示交换机的名称 第二个参数表示routingKey 交换机是通过routingKey进行选择队列的。这里使用的交换机是fanout交换机,所以routingKey不管用。所以不配置。

    channel.basicPublish(FANOUT_EXCHANGE_NAME, "", null, msg.getBytes());

    }

    channel.close();

    rabbitMQConnections.close();

    System.out.println(new Date() + "消息发行成功");

    }

    }

    View Code

    2.创建消费者1号

    8f900a89c6347c561fdf2122f13be562.png

    961ddebeb323a10fe0623af514929fc1.png

    public classFanoutExchangeReceiver {private static final String FANOUT_EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throwsIOException, TimeoutException {

    Connection rabbitMQConnections=RabbitMQConnectionFactory.getRabbitMQConnections();

    Channel channel=rabbitMQConnections.createChannel();//声明一个交换机

    channel.exchangeDeclare(FANOUT_EXCHANGE_NAME,"fanout");//声明一个临时队列

    String queueName =channel.queueDeclare().getQueue();

    System.out.println("临时队列的名称"+queueName);//将队列和交换机绑定起来

    channel.queueBind(queueName,FANOUT_EXCHANGE_NAME,"");

    System.out.println("==========队列绑定了交换机====================");//消费回调

    DeliverCallback deliverCallback = newDeliverCallback() {

    @Overridepublic void handle(String consumerTag, Delivery delivery) throwsIOException {

    String receiverMessage= new String(delivery.getBody(),"UTF-8");

    System.out.println(new Date()+"1号队列接收到的消息=======>"+receiverMessage);//手动提供一个应答

    channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

    }

    };

    channel.basicConsume(queueName,false,deliverCallback,consumerTag ->{});

    }

    }

    View Code

    3.创建消费者2号

    8f900a89c6347c561fdf2122f13be562.png

    961ddebeb323a10fe0623af514929fc1.png

    public classFanoutExchangeReceiver2 {private static final String FANOUT_EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throwsIOException, TimeoutException {

    Connection rabbitMQConnections=RabbitMQConnectionFactory.getRabbitMQConnections();

    Channel channel=rabbitMQConnections.createChannel();//声明一个交换机

    channel.exchangeDeclare(FANOUT_EXCHANGE_NAME,"fanout");//声明一个临时队列

    String queueName =channel.queueDeclare().getQueue();

    System.out.println("临时队列的名称"+queueName);//将队列和交换机绑定起来

    channel.queueBind(queueName,FANOUT_EXCHANGE_NAME,"");

    System.out.println("==========队列绑定了交换机====================");//消费回调

    DeliverCallback deliverCallback = newDeliverCallback() {

    @Overridepublic void handle(String consumerTag, Delivery delivery) throwsIOException {

    String receiverMessage= new String(delivery.getBody(),"UTF-8");

    System.out.println(new Date()+"2号队列接收到的消息=======>"+receiverMessage);//手动提供一个应答

    channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

    }

    };

    channel.basicConsume(queueName,false,deliverCallback,consumerTag ->{});

    }

    }

    View Code

    4.创建消费者3号

    8f900a89c6347c561fdf2122f13be562.png

    961ddebeb323a10fe0623af514929fc1.png

    public classFanoutExchangeReceiver3 {private static final String FANOUT_EXCHANGE_NAME = "fanout_exchange";private static final String FANOUT_EXCHANGE_QUEUE_NAME = "fanout_exchange_queue_name";public static void main(String[] args) throwsIOException, TimeoutException {

    Connection rabbitMQConnections=RabbitMQConnectionFactory.getRabbitMQConnections();

    Channel channel=rabbitMQConnections.createChannel();

    System.out.println("队列名称"+FANOUT_EXCHANGE_QUEUE_NAME);//声明一个交换机

    channel.exchangeDeclare(FANOUT_EXCHANGE_NAME,"fanout");//声明一个队列

    channel.queueDeclare(FANOUT_EXCHANGE_QUEUE_NAME,false,false,false,null);//将队列和交换机绑定起来

    channel.queueBind(FANOUT_EXCHANGE_QUEUE_NAME,FANOUT_EXCHANGE_NAME,"");

    System.out.println("==========队列绑定了交换机====================");//消费回调

    DeliverCallback deliverCallback = newDeliverCallback() {

    @Overridepublic void handle(String consumerTag, Delivery delivery) throwsIOException {

    String receiverMessage= new String(delivery.getBody(),"UTF-8");

    System.out.println(new Date()+"3号队列接收到的消息=======>"+receiverMessage);//手动提供一个应答

    channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

    }

    };

    channel.basicConsume(FANOUT_EXCHANGE_QUEUE_NAME,false,deliverCallback,consumerTag ->{});

    }

    }

    View Code

    5.创建消费者4号

    8f900a89c6347c561fdf2122f13be562.png

    961ddebeb323a10fe0623af514929fc1.png

    public classFanoutExchangeReceiver4 {private static final String FANOUT_EXCHANGE_NAME = "fanout_exchange";private static final String FANOUT_EXCHANGE_QUEUE_NAME = "fanout_exchange_queue_name";public static void main(String[] args) throwsIOException, TimeoutException {

    Connection rabbitMQConnections=RabbitMQConnectionFactory.getRabbitMQConnections();

    Channel channel=rabbitMQConnections.createChannel();

    System.out.println("队列名称"+FANOUT_EXCHANGE_QUEUE_NAME);//声明一个交换机

    channel.exchangeDeclare(FANOUT_EXCHANGE_NAME,"fanout");//声明一个队列

    channel.queueDeclare(FANOUT_EXCHANGE_QUEUE_NAME,false,false,false,null);//将队列和交换机绑定起来

    channel.queueBind(FANOUT_EXCHANGE_QUEUE_NAME,FANOUT_EXCHANGE_NAME,"");

    System.out.println("==========队列绑定了交换机====================");//消费回调

    DeliverCallback deliverCallback = newDeliverCallback() {

    @Overridepublic void handle(String consumerTag, Delivery delivery) throwsIOException {

    String receiverMessage= new String(delivery.getBody(),"UTF-8");

    System.out.println(new Date()+"4号队列接收到的消息=======>"+receiverMessage);//手动提供一个应答

    channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

    }

    };

    channel.basicConsume(FANOUT_EXCHANGE_QUEUE_NAME,false,deliverCallback,consumerTag ->{});

    }

    }

    View Code

    生产者发送消息到交换机,消费者受到消息如下:

    d5974a64e2c632d5311f6ede4666ca4f.png

    图一:消费者1号

    0b9a1e97555586f4121b6d6c57f20820.png

    图二:消费者2号

    由上面图一和图二,可以看出,2个队列的名字,都是Rabbitmq创建的临时队列的名字。由于交换机是Fanout模式,故每一个消费者,收到的消息数和生产者发送的消息数一样。

    226353cc0ccf4b83e1a400bf31ed5312.png

    图三:消费者3号

    59ae7ddd32673e0121f7e12a5daed578.png

    图四 消费者4号

    上面图三和图四,使用的是同一个队列。但是有2个不同的消费者,这是WORK模式。

    可以明显的看出。消息号为奇数的由3号消费者消费了。消息数为偶数的由4号消费者消费了。使用了轮询的方式

    展开全文
  • EventBus是Guava的事件处理机制,是设计模式中的观察者模式(生产/消费者编程模型)的优雅实现。对于事件监听和发布订阅模式,EventBus是一个非常优雅和简单解决方案,我们不用创建复杂的类和接口层次结构。  ...

     EventBus是Guava的事件处理机制,是设计模式中的观察者模式(生产/消费者编程模型)的优雅实现。对于事件监听和发布订阅模式,EventBus是一个非常优雅和简单解决方案,我们不用创建复杂的类和接口层次结构。

      Observer模式是比较常用的设计模式之一,虽然有时候在具体代码里,它不一定叫这个名字,比如改头换面叫个Listener,但模式就是这个模式。手工实现一个Observer也不是多复杂的一件事,只是因为这个设计模式实在太常用了,Java就把它放到了JDK里面:Observable和Observer,从JDK 1.0里,它们就一直在那里。从某种程度上说,它简化了Observer模式的开发,至少我们不用再手工维护自己的Observer列表了。不过,如前所述,JDK里的Observer从1.0就在那里了,直到Java 7,它都没有什么改变,就连通知的参数还是Object类型。要知道,Java 5就已经泛型了。Java 5是一次大规模的语法调整,许多程序库从那开始重新设计了API,使其更简洁易用。当然,那些不做应对的程序库,多半也就过时了。这也就是这里要讨论知识更新的原因所在。今天,对于普通的应用,如果要使用Observer模式该如何做呢?答案是Guava的EventBus。

    EventBus使用起来分订阅、注册、发布、取消注册等步骤:

    在订阅者类中实现各种事件的订阅函数
    public void onEvent(AnyEventType event) {}
    把该订阅类注册到Bus中
    eventBus.register(this);
    向Bus发布事件
    eventBus.post(event);
    不需要的时候 取消订阅事件
    eventBus.unregister(this);


     EventBus主要有3点:
    1. 事件订阅函数不是基于注解(Annotation)的,而是基于命名约定的,在Android 4.0之前的版本中,注解解析起来比较慢 , 事件响应函数默认以“onEvent”开始,可以在EventBus中修改这个值,但是不推荐这么干
    2. 事件响应有更多的线程选择
    EventBus可以向不同的线程中发布事件,在ThreadMode 枚举中定义了4个线程,只需要在事件响应函数名称“onEvent”后面添加对应的线程类型名称,则还事件响应函数就会在对应的线程中执行,比如事件函数“onEventAsync”就会在另外一个异步线程中执行,ThreadMode定义的4个线程类型如下:
    PostThread:事件响应函数和事件发布在同一线程中执行。这个是默认值,这样可以避免线程切换。
    MainThread:事件响应函数会在Android应用的主线程(大部分情况下都是UI线程)中执行。
    BackgroundThread:事件响应函数会在一个后台线程中执行。如果事件发布函数不是在主线程中,则会立即在事件发布线程中执行响应函数。如果事件发布函数在主线程中,EventBus则会在唯一的一个后台线程中按照顺序来执行所有的后台事件响应函数。

    上面的3种事件响应函数,应该能够很快的执行完,不然的话会阻塞各自的事件发布。

    Async:事件响应函数在另外一个异步线程中执行。该线程和发布线程、主线程相互独立。如果事件响应函数需要较长的时间来执行,则应该使用该模式,例如 网络访问等。需要注意的是,由于系统并行的限制,应该避免在同一时间触发大量的异步线程。 EventBus使用一个线程池来提高线程的效率。


    3. EventBus支持 Sticky Event

    有时候某个事件可能会用到多次,比如在前面介绍Event Bus模型一文的示例中,最新的位置更新信息,可能需要多次用到,真对这种情况,您可以把该事件发布为Sticky Event,然后,当需要查询该信息的时候,可以通过Bus的getStickyEvent(ClasseventType) 函数来查询最新发布的Event对象。
    同一类型的事件只保存最新的Event对象。
    注册和发布事件的函数分别为 registerSticky(…) 和 postSticky(Object event)




    展开全文
  • 系统设计 消息队列

    2020-03-05 12:25:04
    消息模型 点对点 消息生产者向消息队列中发送了一个消息之后,只能被一个消费者消费一次。 ...而在发布订阅模式中,生产者消费者不知道对方的存在,它们之间通过频道进行通信。 观察者模式...

    可以把消息队列比作是一个存放消息的容器,当需要使用消息的时候可以取出消息使用。消息队列是分布式系统中重要的组件,使用消息队列主要是为了通过异步处理提高系统性能、削峰、降低系统耦合性。目前使用较多的消息队列有ActiveMQ,RabbitMQ,Kafka,RocketMQ。

    队列 Queue 是一种先进先出的数据结构,所以消费消息时也是按照顺序来消费的。但是偶尔也会出现消息被消费的顺序不对的情况,一定要保证消息被消费的顺序正确。

    消息队列的优点

    1. 通过异步处理提高系统性能(削峰、减少响应所需时间)

    消息队列具有很好的削峰作用的功能,即通过异步处理,将短时间高并发产生的事务消息存储在消息队列中,从而削平高峰期的并发事务。 举例:在电子商务一些秒杀、促销活动中,合理使用消息队列可以有效抵御促销活动刚开始大量订单涌入对系统的冲击。

    因为用户请求数据写入消息队列之后就立即返回给用户了,但是请求数据在后续的业务校验、写数据库等操作中可能失败。因此使用消息队列进行异步处理之后,需要适当修改业务流程进行配合,比如用户在提交订单之后,订单数据写入消息队列,不能立即返回用户订单提交成功,需要在消息队列的订单消费者进程真正处理完该订单之后,甚至出库后,再通过电子邮件或短信通知用户订单成功,以免交易纠纷。这就类似我们平时手机订火车票和电影票。

    2. 降低系统耦合度

    如果模块之间不存在直接调用,那么新增模块或者修改模块就对其他模块影响较小,这样系统的可扩展性无疑更好一些。生产者(客户端)发送消息到消息队列中去,接受者(服务端)处理消息,需要消费的系统直接去消息队列取消息进行消费即可而不需要和其他系统有耦合, 这显然也提高了系统的扩展性。

    消息队列使用发布-订阅模式工作,消息发送者(生产者)发布消息,一个或多个消息接受者(消费者)订阅消息。 消息发送者(生产者)和消息接受者(消费者)之间没有直接耦合,消息发送者将消息发送至分布式消息队列即结束对消息的处理,消息接受者从分布式消息队列获取该消息后进行后续处理,并不需要知道该消息从何而来。对新增业务,只要对该类消息感兴趣,即可订阅该消息,对原有系统和业务没有任何影响,从而实现网站业务的可扩展性设计。

    消息接受者对消息进行过滤、处理、包装后,构造成一个新的消息类型,将消息继续发送出去,等待其他消息接受者订阅该消息。因此基于事件(消息对象)驱动的业务架构可以是一系列流程。

    另外为了避免消息队列服务器宕机造成消息丢失,会将成功发送到消息队列的消息存储在消息生产者服务器上等消息真正被消费者服务器处理后才删除消息。在消息队列服务器宕机后,生产者服务器会选择分布式消息队列服务器集群中的其他服务器发布消息。

    : 不要认为消息队列只能利用发布-订阅模式工作,只不过在解耦这个特定业务环境下是使用发布-订阅模式的。除了发布-订阅模式,还有点对点订阅模式(一个消息只有一个消费者),比较常用的是发布-订阅模式。 另外,这两种消息模型是 JMS (Java Message Service,Java消息服务) 提供的,AMQP 协议还提供了 5 种消息模型。

    使用消息队列带来的问题

    • 系统可用性降低: 系统可用性在某种程度上降低。在引入 MQ 之前,不用考虑消息丢失或者 MQ 挂掉等情况。但是引入 MQ 之后就需要去考虑。
    • 系统复杂性提高: 引入 MQ 之后,需要保证消息没有被重复消费、处理消息丢失的情况、保证消息传递的顺序性等等问题。
    • 一致性问题: 消息队列带来的异步确实可以提高系统响应速度。但是如果消息的真正消费者并没有正确消费消息就会导致数据不一致。

    消息模型

    点对点

    消息生产者向消息队列中发送了一个消息之后,只能被一个消费者消费一次。

    img

    发布 / 订阅 (现代观察者,非传统观察者模式)

    消息生产者向频道发送一个消息之后,多个消费者可以从该频道订阅到这条消息并消费。

    img

    发布与订阅模式和观察者模式有以下不同:

    • 观察者模式中,观察者和主题都知道对方的存在;而在发布与订阅模式中,生产者与消费者不知道对方的存在,它们之间通过频道进行通信。
    • 观察者模式是同步的,当事件触发时,主题会调用观察者的方法,然后等待方法返回;而发布与订阅模式是异步的,生产者向频道发送一个消息之后,就不需要关心消费者何时去订阅这个消息,可以立即返回。

    img

    使用场景

    异步处理

    发送者将消息发送给消息队列之后,不需要同步等待消息接收者处理完毕,而是立即返回进行其它操作。消息接收者从消息队列中订阅消息之后异步处理。

    例如在注册流程中通常需要发送验证邮件来确保注册用户身份的合法性,可以使用消息队列使发送验证邮件的操作异步处理,用户在填写完注册信息之后就可以完成注册,而将发送验证邮件这一消息发送到消息队列中。

    只有在业务流程允许异步处理的情况下才能这么做,例如上面的注册流程中,如果要求用户对验证邮件进行点击之后才能完成注册的话,就不能再使用消息队列。

    流量削锋

    在高并发的场景下,如果短时间有大量的请求到达会压垮服务器。

    可以将请求发送到消息队列中,服务器按照其处理能力从消息队列中订阅消息进行处理。

    应用解耦

    如果模块之间不直接进行调用,模块之间耦合度就会很低,那么修改一个模块或者新增一个模块对其它模块的影响会很小,从而实现可扩展性。

    通过使用消息队列,一个模块只需要向消息队列中发送消息,其它模块可以选择性地从消息队列中订阅消息从而完成调用。

    可靠性

    发送端的可靠性

    发送端完成操作后一定能将消息成功发送到消息队列中。

    实现方法:在本地数据库建一张消息表,将消息数据与业务数据保存在同一数据库实例里,这样就可以利用本地数据库的事务机制。事务提交成功后,将消息表中的消息转移到消息队列中,若转移消息成功则删除消息表中的数据,否则继续重传。

    本地消息表:

    本地消息表与业务数据表处于同一个数据库中,这样就能利用本地事务来保证在对这两个表的操作满足事务特性,并且使用了消息队列来保证最终一致性。

    1. 在分布式事务操作的一方完成写业务数据的操作之后向本地消息表发送一个消息,本地事务能保证这个消息一定会被写入本地消息表中。
    2. 之后将本地消息表中的消息转发到消息队列中,如果转发成功则将消息从本地消息表中删除,否则继续重新转发。
    3. 在分布式事务操作的另一方从消息队列中读取一个消息,并执行消息中的操作。

    img

    接收端的可靠性

    接收端能够从消息队列成功消费一次消息。

    两种实现方法:

    • 保证接收端处理消息的业务逻辑具有幂等性:只要具有幂等性,那么消费多少次消息,最后处理的结果都是一样的。
    • 保证消息具有唯一编号,并使用一张日志表来记录已经消费的消息编号。
    展开全文
  • 消息队列

    千次阅读 2020-07-03 19:34:36
    而在发布订阅模式中,生产者消费者不知道对方的存在,它们之间通过频道进行通信。 观察者模式是同步的,当事件触发时,主题会调用观察者的方法,然后等待方法返回;而发布订阅模式是异步的,生产者向频道发送...
  • 一文学会消息队列

    多人点赞 热门讨论 2020-09-11 17:09:20
    一、消息模型 点对点 发布/订阅 二、使用场景 异步处理 流量削锋 ...一、消息模型 ...消息生产者向消息队列中发送了一个消息之后,只能被...而在发布订阅模式中,生产者消费者不知道对方的存在,它们之间通过频道进..
  • 你还不懂消息队列

    2021-03-04 00:31:34
    而在发布订阅模式中,生产者消费者不知道对方的存在,它们之间通过频道进行通信。 观察者模式是同步的,当事件触发时,主题会调用观察者的方法,然后等待方法返回;而发布订阅模式是异步的,生产者向频道发送...
  • 一、消息模型 1.点对点 消息生产者向消息队列中发送了一个消息之后,只能被一个...而在发布订阅模式中,生产者消费者不知道对方的存在,它们之间通过频道进行通信。 观察者模式是同步的,当事件触发时,主题会调...
  • 一、消息模型 点对点 消息生产者向消息队列中发送了一个消息之后,只能被...而在发布订阅模式中,生产者消费者不知道对方的存在,它们之间通过频道进行通信。 观察者模式是同步的,当事件触发时,主题会调用观察...
  • 消息队列--RabbitMQ

    2019-06-27 15:26:07
    一、消息模型 1.点对点 消息生产者向消息队列中发送了一个消息之后,只能被一...而在发布订阅模式中,生产者消费者不知道对方的存在,它们之间通过频道进行通信。观察者模式是同步的,当事件触发时,主题会调用...
  • 一、消息模型 点对点 发布/订阅 二、使用场景 异步处理 流量削锋 ...一、消息模型 ...消息生产者向消息队列中发送了一个消息...而在发布订阅模式中,生产者消费者不知道对方的存在,它们之间通过频道进行通信。 观..
  • 消息队列从入门到精通

    万次阅读 2018-10-14 09:31:40
    一、消息模型 点对点 发布/订阅 二、使用场景 异步处理 流量削锋 ...一、消息模型 ...消息生产者向消息队列中发送了一个消息之后,只能被一个消费者消费一次。...发布订阅模式和观察者模式有以下不同: 观察...
  • 消息队列中间件

    2018-08-20 19:05:25
    一、消息模型 点对点 发布/订阅 二、使用场景 异步处理 流量削锋 应用解耦 三、可靠性 发送端的可靠性 接收端的可靠性 ...一、消息模型 ...消息生产者向消息队列中发送了一...发布订阅模式和观察者模式...
  • Guava EventBus 使用

    千次阅读 2020-01-19 16:07:59
    EventBus 是 Google Guava 提供的消息发布-订阅类库,是设计模式中的观察者模式(生产/消费者编程模型)的优雅实现,消息通知负责人通过 EventBus 去注册/注销观察者,最后由消息通知负责人给观察者发布消息。...
  • EventBus 学习笔记

    2016-07-18 15:48:51
    EventBus是Guava的事件处理机制,是设计模式中的观察者模式(生产/消费者编程模型)的优雅实现。对于事件监听和发布订阅模式,EventBus是一个非常优雅和简单解决方案,我们不用创建复杂的类和接口层次结构。
  • Guava之EventBus实战

    2020-04-30 15:29:07
    EventBus是Guava的事件处理机制,是设计模式中的观察者模式(生产/消费者编程模型)的优雅实现。对于事件监听和发布订阅模式,EventBus是一个非常优雅和简单解决方案,我们不用创建复杂的类和接口层次结构。 定义...
  • EventBus是Guava的事件处理机制,是设计模式中的观察者模式(生产/消费者编程模型)的优雅实现,在应用中可以处理一些异步任务。对于事件监听和发布订阅模式,EventBus是一个非常优雅和简单解决方案,我们不用创建...
  • EventBus是Guava的事件处理机制,是设计模式中的观察者模式(生产/消费者编程模型)的优雅实现。对于事件监听和发布订阅模式,EventBus是一个非常优雅和简单解决方案,我们不用创建复杂的类和接口层次结构。 ...
  • EventBus使用

    2018-12-15 22:23:56
    EventBus是由谷歌提供的一种发布订阅的库,它很好的实现了观察者模式,EventBus是Guava的事件处理机制(生产/消费者编程模型)的优雅实现。对于事件监听和发布订阅模式,EventBus是一个非常优雅和简单解决方案,我们...
  • Guava EventBus 源码分析

    2019-06-08 13:40:24
    EventBus是Guava的事件处理机制,是设计模式中的观察者模式(生产/消费者编程模型)的优雅实现。 EventBus是一个非常优雅简单解决方案,我们不用创建复杂的类接口层次结构。 传统上,Java的进程内事件分发都是...
  • Guava EventBus

    2018-12-17 20:15:37
    EventBus是Guava的事件处理机制,是设计模式中的观察者模式(生产/消费者编程模型)的优雅实现。对于事件监听和发布订阅模式,EventBus是一个非常优雅和简单解决方案,我们不用创建复杂的类和接口层次结构。 1. ...
  • guava之eventbus

    2020-07-19 12:19:53
    EventBus是Guava的事件处理机制,是设计模式中的观察者模式(生产/消费者编程模型)的优雅实现。对于事件监听和发布订阅模式,EventBus是一个非常优雅和简单解决方案,我们不用创建复杂的类和接口层次结构。 2,...
  • Java-类库-Guava-EventBus

    2016-07-02 18:11:22
    EventBus是Guava的事件处理机制,是设计模式中的观察者模式(生产/消费者编程模型)的优雅实现。对于事件监听和发布订阅模式,EventBus是一个非常优雅和简单解决方案,我们不用创建复杂的类和接口层次结构。  ...
  • 一、简述EventBus是Guava的事件处理机制,是设计模式中的观察者模式(生产/消费者编程模型)的优雅实现,在应用中可以处理一些异步任务。对于事件监听和发布订阅模式,EventBus是一个非常优雅和简单解决方案,我们不用...
  • Guava学习笔记:EventBus

    2015-08-18 16:02:10
    EventBus是Guava的事件处理机制,是设计模式中的观察者模式(生产/消费者编程模型)的优雅实现。对于事件监听和发布订阅模式,EventBus是一个非常优雅和简单解决方案,我们不用创建复杂的类和接口层次结构。  ...
  • Guava----EventBus

    2019-05-29 15:42:00
    EventBus是Guava的事件处理机制,是设计模式中的观察者模式(生产/消费者编程模型)的优雅实现。对于事件监听和发布订阅模式,EventBus是一个非常优雅和简单解决方案,我们不用创建复杂的类和接口层次结构。  ...
  • EventBus是Guava的事件处理机制,是设计模式中的观察者模式(生产/消费者编程模型)的优雅实现。对于事件监听和发布订阅模式,EventBus是一个非常优雅和简单解决方案,我们不用创建复杂的类和接口层次结构。  ...

空空如也

空空如也

1 2 3
收藏数 59
精华内容 23
关键字:

观察者模型和发布订阅模型生产者消费者