精华内容
下载资源
问答
  • 本文实例讲述了Android编程实现异步消息处理机制的几种方法。分享给大家供大家参考,具体如下: 1、概述 Android需要更新ui的话就必须在ui线程上进行操作。否则就会抛异常。 假如有耗时操作,比如:在子线程中下载...
  • Java消息服务(JMS)是用于编写使用异步消息传递的JEE应用程序的API。传统的使用JMSAPI进行消息传递的实现包括多个步骤,例如JNDI查询队列连接工厂和Queue资源,在实际发送和接收消息前创建一个JMS会话。Spring框架则...
  • C++封装实现的异步加锁消息队列,支持多线程,完美封装,可用于消息接收、处理
  • NULL 博文链接:https://hangyu608.iteye.com/blog/990924
  • Android中的异步消息机制分为四个部分:Message、Handler、MessageQueue和Looper。 其中,Message是线程之间传递的消息,其what、arg1、arg2字段可以携带整型数据,obj字段可以携带一个Object对象。 Handler是处理者...
  • 异步消息实战

    2017-07-21 20:58:49
    异步消息实战
  • Errai是JBoss开发的一个基于GWT的框架,使用下一代WEB技术,用于构建富客户端应用。该框架构建在ErraiBus基础上,为客户端与服务器的异步消息传递提供了一个真正统一的消息基础设施。 标签:Errai
  • AsyncMessageHandle 基于HttpUrlConnection和OkHttp 进行封装的网络框架,支持多文件和大数据量的上传
  • 味精 RAS-MSG是可靠的异步消息系统,可确保分布式系统的最终一致性。 设计文件:\ doc 源代码:\ lib 系统示例:\ examples 测试一下
  • C# socketUdp 异步 消息发送接收
  • 一、RocketMQ 支持 3 种消息发送方式 : ...2、异步消息(async message) producer向 broker 发送消息时指定消息发送成功及发送异常的回调方法,调用 API 后立即返回,producer发送消息线程不阻塞 ,...

    一、RocketMQ 支持 3 种消息发送方式 :

    1、同步消息(sync message )

    producer向 broker 发送消息,执行 API 时同步等待, 直到broker 服务器返回发送结果 。

    2、异步消息(async message)

    producer向 broker 发送消息时指定消息发送成功及发送异常的回调方法,调用 API 后立即返回,producer发送消息线程不阻塞 ,消息发送成功或失败的回调任务在一个新的线程中执行 。

    3、单向消息(oneway message)

    producer向 broker 发送消息,执行 API 时直接返回,不等待broker 服务器的结果 。

    二、RocketMQ消息结构

    RocketMQ的消息包括基础属性和扩展属性两部分:

    1、基础属性

    1)topic : 主题相当于消息的一级分类,具有相同topic的消息将发送至该topic下的消息队列中,比方说一个电商系统可以分为商品消息、订单消息、物流消息等,就可以在broker中创建商品主题、订单主题等,所有商品的消息发送至该主题下的消息队列中。

    2)消息体:即消息的内容 ,可以的字符串、对象等类型(可系列化)。消息的最大长度 是4M。

    3) 消息 Flag:消息的一个标记,RocketMQ不处理,留给业务系统使用。

    2、扩展属性

    1)tag :相当于消息的二级分类,用于消费消息时进行过滤,可为空 。

    2)keys: Message 索引键,在运维中可以根据这些 key 快速检索到消息, 可为空 。 3)waitStoreMsgOK :消息发送时是否等消息存储完成后再返回 。

    Message 的基础属性主要包括消息所属主题 topic , 消息 Flag(RocketMQ 不做处理)、 扩展属性、消息体 。

    三、同步消息

    1、创建test-rocketmq生产者工程

    1. 创建一个test-rocketmq的测试工程专门用于rocketmq的功能测试。

    test-rocketmq父工程的pom.xml如下:

    <?xml version="1.0" encoding="UTF-8"?>
     <project xmlns="http://maven.apache.org/POM/4.0.0"
              xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
              xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
         <parent>
             <artifactId>mq</artifactId>
             <groupId>com.pbteach</groupId>
             <version>1.0-SNAPSHOT</version>
         </parent>
         <modelVersion>4.0.0</modelVersion>
     ​
         <artifactId>test-rocketmq</artifactId>
         <packaging>pom</packaging>
         <dependencies>
             <dependency>
                 <groupId>org.apache.rocketmq</groupId>
                 <artifactId>rocketmq-spring-boot-starter</artifactId>
             </dependency>
             <dependency>
                 <groupId>org.springframework.boot</groupId>
                 <artifactId>spring-boot-starter-test</artifactId>
             </dependency>
             <dependency>
                 <groupId>org.springframework.boot</groupId>
                 <artifactId>spring-boot-starter-web</artifactId>
             </dependency>
         </dependencies>
     ​
     </project>
    

    2)创建rocketmq-producer生产者工程

    rocketmq-producer的pom.xml如下

     <?xml version="1.0" encoding="UTF-8"?>
     <project xmlns="http://maven.apache.org/POM/4.0.0"
              xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
              xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
         <parent>
             <artifactId>test-rocketmq</artifactId>
             <groupId>com.pbteach</groupId>
             <version>1.0-SNAPSHOT</version>
         </parent>
         <modelVersion>4.0.0</modelVersion>
     ​
         <artifactId>rocketmq-producer</artifactId>
     ​
     ​
     </project>
    

    3) 新建rocketmq-producer工程 的application.yml文件

     server:
       port: 8181 #服务端口
       servlet:
         context-path: /rocketmq-producer
     ​
     spring:
       application:
         name: rocketmq-producer #指定服务名
     rocketmq:
       nameServer: 127.0.0.1:9876
       producer:
         group: demo-producer-group
    

    4)新建启动类

     /**
      * @author 攀博课堂(www.pbteach.com)
      * @version 1.0
      **/
    @SpringBootApplication
     public class ProducerApplication {
     ​
         public static void main(String[] args) {
             SpringApplication.run(ProducerApplication.class, args);
         }
     ​
     }
    

    2、发送同步消息

    package com.pbteach.test.rocketmq.message;
     ​
     import org.apache.rocketmq.spring.core.RocketMQTemplate;
     import org.springframework.beans.factory.annotation.Autowired;
     import org.springframework.stereotype.Component;
     ​
     /**
      * rocketmq发送消息类
      * @author 攀博课堂(www.pbteach.com)
      * @version 1.0
      **/
     @Component
     public class ProducerSimple {
     ​
         @Autowired
         private RocketMQTemplate rocketMQTemplate;
     ​
         /**
          * 发送同步消息
          * @param topic
          * @param msg
          */
         public void sendSyncMsg(String topic, String msg){
             rocketMQTemplate.syncSend(topic,msg);
         }
     ​
     ​
     }
    


    3、测试

    1)在test下编写测试类,发送同步消息。

    package com.pbteach.test.rocketmq.message;
     ​
     import org.junit.Test;
     import org.junit.runner.RunWith;
     import org.springframework.beans.factory.annotation.Autowired;
     import org.springframework.boot.test.context.SpringBootTest;
     import org.springframework.test.context.junit4.SpringRunner;
     ​
     /**
      * @author 攀博课堂(www.pbteach.com)
      * @version 1.0
      **/
     @RunWith(SpringRunner.class)
     @SpringBootTest
     public class ProducerSimpleTest {
     ​
         @Autowired
         private ProducerSimple producerSimple;
     ​
         //测试发送同步消息
         @Test
         public void testSendSyncMsg(){
             this.producerSimple.sendSyncMsg("my-topic", "第一条同步消息");
             System.out.println("end...");
         }
     ​
     }
     ​
    

    2)启动NameServer、Broker、管理端

    3)执行testSendSyncMsg方法

    4)观察控制台和管理端

    控制台出现end… 表示消息发送成功。

    进入管理端,查询消息。
    在这里插入图片描述
    在这里插入图片描述
    4、创建消费者工程
    1)创建消息消费者工程,pom.xml如下

    <?xml version="1.0" encoding="UTF-8"?>
     <project xmlns="http://maven.apache.org/POM/4.0.0"
              xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
              xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
         <parent>
             <artifactId>test-rocketmq</artifactId>
             <groupId>com.pbteach</groupId>
             <version>1.0-SNAPSHOT</version>
         </parent>
         <modelVersion>4.0.0</modelVersion>
     ​
         <artifactId>rocketmq-consumer</artifactId>
     ​
     ​
     </project>
    

    2)启动类

    /**
      * @author 攀博课堂(www.pbteach.com)
      * @version 1.0
      **/
     @SpringBootApplication
     public class ConsumerApplication {
     ​
         public static void main(String[] args) {
             SpringApplication.run(ConsumerApplication.class, args);
         }
     ​
     }
    

    3)配置文件application.yml

    server:
       port: 8182 #服务端口
       servlet:
         context-path: /rocketmq-consumer
     ​
     spring:
       application:
         name: rocketmq-consumer #指定服务名
     rocketmq:
       nameServer: 127.0.0.1:9876
    


    4)编写消费消息监听类:

    package com.pbteach.test.rocketmq.message;
     ​
     import org.apache.rocketmq.spring.annotation.ConsumeMode;
     import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
     import org.apache.rocketmq.spring.core.RocketMQListener;
     import org.springframework.stereotype.Component;
     ​
     /**
      * 消费消息监听类
      * @author 攀博课堂(www.pbteach.com)
      * @version 1.0
      **/
     @Component
     @RocketMQMessageListener(topic = "my-topic",consumerGroup = "demo-consumer-group")
     public class ConsumerSimple implements RocketMQListener<String> {
     ​
         //接手到消息调用此方法
         @Override
         public void onMessage(String s) {
             System.out.println(s);
         }
     }
     ​
    

    监听消息队列 需要指定:

    topic:监听的主题

    consumerGroup:消费组,相同消费组的消费者共同消费该主题的消息,它们组成一个集群。

    5、测试

    1、启动消费者工程

    启动消费者工程,观察控制台输出“第一条同步消息”消息内容,这说明从消息队列已经读取到消息。

    2、保证消费者工程已启动,再次发送消息,观察控制台是否输出“第一条同步消息”消息内容,输出则说明接收消息成功。

    四、消息发送过程

    通过测试对同步消息的发送和接收有一个粗略的认识,下边分析具体的消息发送过程,如下图:
    在这里插入图片描述
    消息发送流程如下:

    1、Producer从NameServer中获取主题路由信息

    Broker将自己的状态上报给NameServer,NameServer中存储了每个Broker及主题、消息队列的信息。

    Producer根据 topic从NameServer查询所有消息队列,查询到的结果例如:

     [
       {"brokerName":"Broker-1","queueId":0},
       {"brokerName":"Broker-1","queueId":1},
       {"brokerName":"Broker-2","queueId":0},
       {"brokerName":"Broker-2","queueId":1}
     ]
    

    Producer按选择算法从以上队列中选择一个进行消息发送,如果发送消息失败则在下次选择的时候 会规避掉失败的broker。

    2、构建消息,发送消息

    发送消息前进行校验,比如消息的内容长度不能为0、消息最大长度、消息必要的属性是否具备等(topic、消息体,生产组等)。

    如果该topic下还没有队列则自动创建,默认一个topic下自动创建4个写队列,4个读队列 。

    为什么要多个队列 ?

    1)高可用

    当某个队列不可用时其它队列顶上。

    2)提高并发

    发送消息是选择队列进行发送,提高发送消息的并发能力。

    消息消费时每个消费者可以监听多个队列,提高消费消息的并发能力。

    生产组有什么用?

    在事务消息中broker需要回查producer,同一个生产组的producer组成一个集群,提高并发能力。

    3、监听队列,消费消息

    一个消费组可以包括多个消费者,一个消费组可以订阅多个主题。

    一个队列同时只允许一个消费者消费,一个消费者可以消费多个队列中的消息。

    消费组有两种消费模式:

    1)集群模式

    一个消费组内的消费者组成一个集群,主题下的一条消息只能被一个消费者消费。

    2)广播模式

    主题下的一条消息能被消费组下的所有消费者消费。

    消费者和broker之间通过推模式和拉模式接收消息,推模式即broker推送给消费者,拉模式是消费者主动从broker查询消息。

    五、异步消息

    producer向 broker 发送消息时指定消息发送成功及发送异常的回调方法,调用 API 后立即返回,producer发送消息线程不阻塞 ,消息发送成功或失败的回调任务在一个新的线程中执行 。

    在ProducerSimple中编写发送异步消息的方法

    /**
      * 发送异步消息
      * @author 攀博课堂(www.pbteach.com)
      * @param topic
      * @param msg
      */
     public void sendASyncMsg(String topic, String msg){
         rocketMQTemplate.asyncSend(topic,msg,new SendCallback() {
             @Override
             public void onSuccess(SendResult sendResult) {
                 //成功回调
                 System.out.println(sendResult.getSendStatus());
             }
     ​
             @Override
             public void onException(Throwable e) {
                 //异常回调
                 System.out.println(e.getMessage());
             }
         });
     }
    

    测试:

     /**
      * 测试类
      * @author 攀博课堂(www.pbteach.com)
      * @version 1.0
      **/
    @Test
     public void testSendASyncMsg() throws InterruptedException {
         this.producerSimple.sendASyncMsg("my-topic", "第一条异步步消息");
         System.out.println("end...");
         //异步消息,为跟踪回调线程这里加入延迟
         Thread.sleep(3000);
     }
    

    六、单向消息

    producer向 broker 发送消息,执行 API 时直接返回,不等待broker 服务器的结果 。

     /**
      * 发送单向消息
      * @author 攀博课堂(www.pbteach.com)
      * @param topic
      * @param msg
      */
     public void sendOneWayMsg(String topic, String msg){
         this.rocketMQTemplate.sendOneWay(topic,msg);
     }
    

    测试:

    略。

    七、延迟消息

    1、延迟消息介绍

    延迟消息也叫做定时消息,比如在电商项目的交易系统中,当用户下单之后超过一段时间之后仍然没有支付,此时就需要将该订单关l闭。要实现该功能的话,可以在用户创建订单时就发送一条包含订单内容的延迟消息,该消息在一段时间之后投递给消息消费者,当消息消费者接收到该消息后,判断该订单的支付状态,如果处于未支付状态,则将该订单关闭。

    RocketMQ的延迟消息实现非常简单,只需要发送消息前设置延迟的时间,延迟时间存在十八个等级(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h),调用setDelayTimeLevel()设置与时间相对应的延迟级别即可。

    2、同步消息延迟

    生产端:

     /**
      * 发送延迟消息
      * 消息内容为json格式
      * @author 攀博课堂(www.pbteach.com)
      */
     public void sendMsgByJsonDelay(String topic, OrderExt orderExt) throws JsonProcessingException, InterruptedException, RemotingException, MQClientException, MQBrokerException {
         //发送同步消息,消息内容将orderExt转为json
         Message<OrderExt> message = MessageBuilder.withPayload(orderExt).build();
         //指定发送超时时间(毫秒)和延迟等级
         this.rocketMQTemplate.syncSend(topic,message,1000,3);
     ​
         System.out.printf("send msg : %s",orderExt);
     }
    

    消费端:

    同自定义消息格式章节。

    测试:

    //测试发送同步消息
     @Test
     public void testSendMsgByJsonDelay() throws JsonProcessingException, InterruptedException, RemotingException, MQClientException, MQBrokerException {
         OrderExt orderExt = new OrderExt();
         orderExt.setId(UUID.randomUUID().toString());
         orderExt.setCreateTime(new Date());
         orderExt.setMoney(168L);
         orderExt.setTitle("测试订单");
         this.producerSimple.sendMsgByJsonDelay("my-topic-obj",orderExt);
         System.out.println("end...");
     }
    

    3、异步消息延迟

    生产端:

     /**
      * 发送异步延迟消息
      * 消息内容为json格式
      * @author 攀博课堂(www.pbteach.com)
      */
     public void sendAsyncMsgByJsonDelay(String topic, OrderExt orderExt) throws JsonProcessingException, InterruptedException, RemotingException, MQClientException, MQBrokerException {
         //消息内容将orderExt转为json
         String json = this.rocketMQTemplate.getObjectMapper().writeValueAsString(orderExt);
         org.apache.rocketmq.common.message.Message message = new org.apache.rocketmq.common.message.Message(topic,json.getBytes(Charset.forName("utf-8")));
         //设置延迟等级
         message.setDelayTimeLevel(3);
         //发送异步消息
         this.rocketMQTemplate.getProducer().send(message,new SendCallback() {
             @Override
             public void onSuccess(SendResult sendResult) {
                 System.out.println(sendResult);
             }
     ​
             @Override
             public void onException(Throwable throwable) {
                 System.out.println(throwable.getMessage());
             }
         });
     ​
     ​
         System.out.printf("send msg : %s",orderExt);
     }
    

    消费端:

    同自定义消息格式章节。

    测试

    //测试发送异步消息
     @Test
     public void testSendAsyncMsgByJsonDelay() throws JsonProcessingException, InterruptedException, RemotingException, MQClientException, MQBrokerException {
         OrderExt orderExt = new OrderExt();
         orderExt.setId(UUID.randomUUID().toString());
         orderExt.setCreateTime(new Date());
         orderExt.setMoney(168L);
         orderExt.setTitle("测试订单");
         this.producerSimple.sendAsyncMsgByJsonDelay("my-topic-obj",orderExt);
         System.out.println("end...");
         Thread.sleep(20000);
     }
    

    八、消费重试

    1、什么是消费重试
    当消息发送到Broker成功,在被消费者消费时如果消费者没有正常消费,此时消息会重试消费。消费重试存在两种场景:

    1)消息没有被消费者接收,比如消费者与broker存在网络异常。此种情况消息会一直被消费重试。

    2)当消息已经被消费者成功接收,但是在进行消息处理时出现异常,消费端无法向Broker返回成功,这种情况下RocketMQ会不断重试。本小节重点讨论第二个场景。

    针对第二种消费重试的场景,borker是怎么知道重试呢?

    消费者在消费消息成功会向broker返回成功状态,否则会不断进行消费重试。

    2、处理策略
    当消息在消费时出现异常,此时消息被不断重试消费。RocketMQ会一直重试消费吗?

    答案是不会!

    消息会按照延迟消息的延迟时间等级(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)从第3级开始重试,每试一次如果还不成功则延迟等级加1。

    比如:一条消息消费失败,等待10s(第3级)进行重试,如果还没有被成功消费则延迟等级加1,即按第4级别延迟等待,等30s继续进行重试,如此进行下去,直到重试16次。

    当重试了16次还未被成功消费将会投递到死信队列,到达死信队列的消息将不再被消费。

    实际生产中的处理策略是什么呢?

    实际生产中不会让消息重试这么多次,通常在重试一定的次数后将消息写入数据库,由另外单独的程序或人工去处理。

    项目使用的Spring整合RocketMQ的方式,消费者实现RocketMQListener的onMessage方法,在此方法中实现处理策略的示例代码如下:

     /**
      * 测试消费重试
      * @author 攀博课堂(www.pbteach.com)
      */
     public class ConsumerSimple implements RocketMQListener<MessageExt> {
     ​
     ​
         @Override
         public void onMessage(MessageExt messageExt) {
             //取出当前重试次数
             int reconsumeTimes = messageExt.getReconsumeTimes();
             //当大于一定的次数后将消息写入数据库,由单独的程序或人工去处理
            if(reconsumeTimes >=2){
                //将消息写入数据库,之后正常返回
                return ;
            }
             throw new RuntimeException(String.format("第%s次处理失败..",reconsumeTimes));
         }
     }
    
    展开全文
  • Android 提供了一个好用的工具---AsyncTask ,方便我们在子线程中对 UI 进行操作,AsyncTask 背后的实现原理也是基于异步消息处理机制的。
  • 所有异步消息使用的示例都使用Spring 进行消息使用。 Spring JMS支持三种类型的消息侦听器,包括: [MessageListenerAdapter]( 异步目录中的示例都演示了这三种类型的消息侦听器。 运行示例 这些示例旨在使用...
  • 同步消息和异步消息

    千次阅读 2020-08-15 12:18:23
    同步消息和异步消息区别 两者使用场景不一样,比如说A给B发送一封电子邮件,A是不需要知道B是否收到就可以了的,把自己的信息传达出去,这样的场景就是异步消息。因为在这个过程中A在乎的是把某件事情传达出去就...

    同步消息和异步消息区别
    在这里插入图片描述
    两者使用场景不一样,比如说A给B发送一封电子邮件,A是不需要知道B是否收到就可以了的,把自己的信息传达出去,这样的场景就是异步消息。因为在这个过程中A在乎的是把某件事情传达出去就可以,而不必在乎其他人的状态,比如张贴告示也是这样,不需要知道每个人都是否知道这则告示的内容,而是张贴出去让大家基本知晓就可以。

    如果在付款的时候,A已经付款了,这个时候如果没有收到支付成功的状态提示的话就会在想自己是否已经支付成功了呢?就会一直处于等待状态,直到系统反馈一个消息,要么是支付成功要么是支付失败才回进行后续的操作。这样的两个例子就能很简单的区分同步消息和异步消息了。

    展开全文
  • 异步消息研讨会 总览 异步消息研讨会是单个实验室的集合,涵盖了异步消息传递的不同方面和模式。 您将在研讨会期间广泛使用 , 和 ,我们还将使用 , 和。 每个实验可以单独完成,各轮之间没有依赖性。 每个实验需要...
  • 包括调用消息(同步消息),异步消息,返回消息,阻止消息,超时消息 调用消息:(UML早期版本也称为同步消息) 定义:调用消息(procedure call )消息的发送者把控制传递给消息的接收者,然后停止活动,等待消息...

    顺序图中的一个重要的概念就是消息。

               包括调用消息(同步消息),异步消息,返回消息,阻止消息,超时消息
    

    调用消息:(UML早期版本也称为同步消息)

    定义:调用消息(procedure call )消息的发送者把控制传递给消息的接收者,然后停止活动,等待消息接收者放弃或者返回控制。调用消息可以用来表示同步的意义。

    举例:下课和上课就是同步消息,只有当发出上课的消息时,同学们接收消息,停止活动,然后再等待下课的这个消息,也就是说没有上课就不会有下课这个消息。再举个例子,如c语言编译器报错后,如果不解决不进行下一步运行操作。

    异步消息

    定义:异步消息的发送者通过消息把信号传递给消息的接收者,然后继续自己的活动,不等待接收者返回消息或控制。

    举例:在网上购物时下单后,商家并未做出任何回应,但你仍然可以进行下一次的购物,并不需要等待返回信息,可以继续下一次的操作。

    展开全文
  • Android中异步消息和同步屏障

    千次阅读 2019-10-19 21:06:51
    Android消息队列MessageQueue中加入的消息分成同步消息和异步消息,在平常开发中接触到的消息基本上都是同步消息,同步消息会被放到消息队列的队尾,Looper在消息循环时从队列头部不断取出同步消息执行。 在Android...

    Android消息队列MessageQueue中加入的消息分成同步消息和异步消息,在平常开发中接触到的消息基本上都是同步消息,同步消息会被放到消息队列的队尾,Looper在消息循环时从队列头部不断取出同步消息执行。

    在Android系统中存在一个VSync消息,它主要负责每16ms更新一次屏幕展示,如果用户同步消息在16ms内没有执行完成,那么VSync消息的更新操作就无法执行在用户看来就出现了掉帧或卡顿的情况,为此Android开发要求每个消息的执行需要限制在16ms之内完成。但是消息队列中可能会包含多个同步消息,假如当前主线程消息队列有10个同步消息,每个同步消息要执行10ms,总共也就需要执行100ms,这段时间内就会有近7帧无法正常刷新展示,应用执行过程中遇到这种情况还是很普遍的。

    Android系统设计时自然也会考虑到这种情况,同步消息会导致延迟主要原因在于排队等候,如果消息发送后不必排队等待直接就执行就能够解决消息延迟问题。Android系统中的异步消息就是专门解决消息处理延迟的问题,它需要配合同步屏障(SyncBarrier)一起工作,在发送异步消息的时候向消息队列投放同步屏障对象,消息队列会返回同步屏障的token,此时消息队列中的同步消息都会被暂停处理,优先执行异步消息处理,等异步消息处理完成再通过消息队列移除token对应的同步屏障,消息队列继续之前暂停的同步消息处理。MessageQueue中同步屏障处理的方法都是隐藏API,需要通过反射方法来调用。

    public class SyncBarrierActivity extends AppCompatActivity {
    	private Handler handler;
    	private static final String TAG = "SyncBarrierActivity";
    	private int token; // 同步屏障对应的token值
    
    	@Override
    	protected void onCreate(Bundle savedInstanceState) {
    		super.onCreate(savedInstanceState);
    		setContentView(R.layout.activity_sync_barrier);
    		handler = new Handler();
    			Message message1 = Message.obtain(handler, new Runnable() {
    				@Override
    				public void run() {
    					Log.d(TAG, "1000");
    				}
    		});
    		// 省略消息message2定义,run()中代码Log.d(TAG, "2000");
    		// 省略消息message3定义,run()中代码Log.d(TAG, "3000");
    		Message message4 = Message.obtain(handler, new Runnable() {
    			@Override
    			public void run() {
    				Log.d(TAG, "4000");
    				removeSyncBarrier(); // 移除同步屏障
    			}
    	   });
    	
    	   // 设置3秒后和4秒后执行的消息为异步消息
    	   message3.setAsynchronous(true);
    	   message4.setAsynchronous(true);
    	   handler.sendMessageDelayed(message1, 1000); // 发送1秒后执行的同步消息
    	   handler.sendMessageDelayed(message2, 2000); // 发送2秒后执行的同步消息
    	   handler.sendMessageDelayed(message3, 3000); // 发送3秒后执行的异步消息
    	   postSyncBarrier(); // 投递同步屏障到消息队列中
    	   handler.sendMessageDelayed(message4, 4000); // 发送4秒后执行的异步消息
       }
    
       // 反射执行投递同步屏障,省略try..catch
        public void postSyncBarrier() {
    	   Method method = MessageQueue.class.getDeclaredMethod("postSyncBarrier");
    	   token = (int) method.invoke(Looper.getMainLooper().getQueue());
       }
    
      // 反射执行移除同步屏障,省略try..catch
       public void removeSyncBarrier() {
    	   Method method = MessageQueue.class
    	.     getDeclaredMethod("removeSyncBarrier", int.class);
    	    method.invoke(Looper.getMainLooper().getQueue(), token);}
       }
    }
    
    //`~ 执行结果
    com.example.mytestproject D/SyncBarrierActivity: 3000 // 优先执行3秒后的异步消息
    com.example.mytestproject D/SyncBarrierActivity: 4000 // 接着执行4秒后的异步消息
    // 4秒消息移除了同步屏障,开始执行同步消息 
    com.example.mytestproject D/SyncBarrierActivity: 1000 
    com.example.mytestproject D/SyncBarrierActivity: 2000 
    

    示例代码中会在主线程中先抛入一个1秒后执行和一个2秒后执行的同步消息,接着向主线程投递一个同步屏障,同步屏障后面接着投递一个3秒后执行和一个4秒后执行的异步消息,4秒后执行的异步消息会移除之前投递的同步屏障。

    假如投递的四个消息全部时同步消息那么它们应该按照时间顺序依次执行,由于同步屏障的存在1秒和2秒执行的消息即使到了执行时间依然没有被执行,3秒和4秒的消息成功通过同步屏障按时执行,在移除同步屏障后1秒和2秒的同步消息得以正常执行。

    MessageQueue之所有将同步屏障的接口都变成隐藏接口是不想普通的开发者向主线程队列投递同步屏障影响VSync消息的正常执行,开发过程中尽量不要使用异步消息和同步屏障。

    展开全文
  • 01 王晓宇 电商异步消息系统的实践.pdf01 王晓宇 电商异步消息系统的实践.pdf
  • 异步消息处理机制

    千次阅读 2019-07-31 20:38:45
    发送和处理消息,利用sendMessage()发送消息,handleMessage()处理消息 MessageQueue.消息队列,用于存放所有message,每个线程只有以一个MessageQueue对象 Looper.是MessageQueue的管家,发现MessageQueue有消息时...
  • Handler异步消息与同步屏障(SyncBarrier)

    千次阅读 2020-10-15 09:59:12
    Android的消息机制之前有一篇文章有写,里面具体讲到了Handler怎么发送和处理消息的整个过程。感兴趣的同学可以先跳转过去看看 从Handler.post(Runnable r)再一次梳理Android的消息机制(以及handler的内存泄露) 在...
  • 发送异步消息是指producer向 broker 发送消息时指定消息发送成功及发送异常的回调方法,调用 API 后立即返回,producer发送消息线程不阻塞 ,消息发送成功或失败的回调任务在一个新的线程中执行 。 相对发送同步消息...
  • Android 异步消息处理机制的几种实现

    千次阅读 2018-07-13 15:27:50
    Android 异步消息处理机制的几种实现 1、概述 Android需要更新ui的话就必须在ui线程上进行操作。否则就会抛异常。 假如有耗时操作,比如:在子线程中下载文件,通知ui线程下载进度,ui线程去更新进度等,这个...
  • 第11章 异步消息与异步调用

    千次阅读 2018-09-24 16:40:11
    11.1 JMS消息介绍 11.1.1 JMS概述 JMS(Java Message Service,即Java消息服务)是一组Java应用程序接口,它提供消息的创建、发送、读取等一系列服务。JMS提供了一组公共应用程序接口和响应的语法,类似于Java数据库...
  • 一个使用springamqp实现的异步消息队列的股票系统,来自springamqp的官网,对于学习springamqp很有帮助。
  • Handler异步加载图片 GridView 上拉刷新 下拉刷新 里面也有其他控件(比如ListView ScrollView等等)的上拉刷新 下拉刷新
  • 基于异步消息模式的通信

    千次阅读 2020-02-24 19:12:35
    基于异步消息模式的通信 使用消息机制时,微服务之间的通信采用异步交换消息的方式来完成。基于消息机制的应用程序通常使用消息代理,它充当服务之间的中介。 消息 消息由消息头部和消息主体组成。 头部包括:...
  • 消息驱动Bean(MDB)是设计用来专门处理基于消息请求的组件,文章在简单介绍WebLogic的基础上重点从4个方面讨论了消息驱动Bean。

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 366,229
精华内容 146,491
关键字:

异步消息