-
2021-12-01 14:55:11
一、认识异步消息处理
所谓异步消息就是发送一个消息,不需要等待返回,随时可以再发送下一个消息。
异步消息处理线程启动后会进入一个无限的循环体之中,每循环一次,从其内部的消息队列中取出一个消息,然后回调相应的消息处理函数,执行完成一个消息后则继续循环。若消息队列为空,线程则会阻塞等待。二、处理机制
Android 中的异步消息处理主要由四个部分组成:Message、Handler、MessageQueue 和 Looper。- Message
Message 是线程之间传递的消息,它可以在内部携带少量信息,用于在不同线程之间交换数据。Message 的 what、arg1、arg2 字段可以用来携带一些整型数据,obj 字段可以携带 Object 对象。 - Handler
Handler 是消息的处理者,也是线程之间的通讯兵,它主要用于发送和处理消息。发送消息一般使用 Handler 的 sendMessage()方法,发出的消息经过一系列的处理,最终会传递到 Handler 的 handleMessage() 方法。 - MessageQueue
MessageQueue 是消息队列,它主要用于存放所有由 Handler 发送过来的消息,这部分消息会一直在消息队列中等待被处理。每个线程中只会有一个MessageQueue 对象。 - Looper
Looper 是每个线程中 MessageQueue 的管家,调用 Looper 的 loop() 方法后,就会进入到一个无限循环当中;每当发现 MessageQueue 中存在一条消息,就会将其取出,并传递到 handleMessage()方法当中。每个线程中也只会有一
个 Looper 对象。
利用该消息机制实现20个随机数的生成,在MainActivity中实现,代码如下:
public class MainActivity extends AppCompatActivity { private TextView textview; private int i = 0; private Handler handler = new Handler(){ @Override public void handleMessage(Message msg) { switch (msg.what) { case 0: textview.setText("已产生 20 个随机数结束"); break; case 1: int radom = msg.arg1; textview.setText(Integer.toString(radom)); break; default: } } }; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); textview = findViewById(R.id.textview); new Thread(new Runnable() { @Override public void run() { Message message = new Message(); i++; if(i<=20) { int radom = (int)(Math.random()*1000); message.what = 1; message.arg1 = radom; handler.sendMessage(message); handler.postDelayed(this, 1000); }else{ message.what = 0; handler.sendMessage(message); } } }).start(); } }
更多相关内容 - Message
-
Java异步处理机制实例详解
2020-08-29 10:05:39本文涉及Java编程中异步处理机制的简单介绍和一个相关实例,相信通过这篇文章,大家能对异步处理有更多的了解。 -
Android异步消息处理机制实现原理详解
2021-01-20 08:32:00消息处理机制主要对象:Looper,Handler,Message(还有MessageQueue和Runnable) Looper不断从MessageQueue消息队列中取出一个Message,然后传给Handle,如此循环往复,如果队列为空,那么它会进入休眠。 这些类的主要... -
架构设计 | 异步处理流程,多种实现模式详解
2021-03-05 18:39:56必须强调一个基础逻辑,异步是一种设计理念,异步操作不等于多线程,MQ中间件,或者消息广播,这些是可以实现异步处理的方式。同步处理和异步处理相对,需要实时处理并响应,一旦超过时间会结束会话,在该过程中调用...一、异步处理
1、异步概念
异步处理不用阻塞当前线程来等待处理完成,而是允许后续操作,直至其它线程将处理完成,并回调通知此线程。
必须强调一个基础逻辑,异步是一种设计理念,异步操作不等于多线程,MQ中间件,或者消息广播,这些是可以实现异步处理的方式。
同步处理和异步处理相对,需要实时处理并响应,一旦超过时间会结束会话,在该过程中调用方一直在等待响应方处理完成并返回。同步类似电话沟通,需要实时对话,异步则类似短信交流,发送消息之后无需保持等待状态。
2、异步处理优点
虽然异步处理不能实时响应,但是处理复杂业务场景,多数情况都会使用异步处理。
异步可以解耦业务间的流程关联,降低耦合度;
降低接口响应时间,例如用户注册,异步生成相关信息表;
异步可以提高系统性能,提升吞吐量;
流量削峰即把请求先承接下来,然后在异步处理;
异步用在不同服务间,可以隔离服务,避免雪崩;
异步处理的实现方式有很多种,常见多线程,消息中间件,发布订阅的广播模式,其根据逻辑在于先把请求承接下来,放入容器中,在从容器中把请求取出,统一调度处理。
注意:一定要监控任务是否产生积压过度情况,任务如果积压到雪崩之势的地步,你会感觉每一片雪花都想勇闯天涯。
3、异步处理模式
异步流程处理的实现有好多方式,但是实际开发中常用的就那么几种,例如:
基于接口异步响应,常用在第三方对接流程;
基于消息生产和消费模式,解耦复杂流程;
基于发布和订阅的广播模式,常见系统通知
异步适用的业务场景,对数据强一致性的要求不高,异步处理的数据更多时候追求的是最终一致性。
二、接口响应异步
1、流程描述
基于接口异步响应的方式,有一个本地业务服务,第三方接口服务,流程如下:
本地服务发起请求,调用第三方服务接口;
请求包含业务参数,和成功或失败的回调地址;
第三方服务实时响应流水号,作为该调用的标识;
之后第三方服务处理请求,得到最终处理结果;
如果处理成功,回调本地服务的成功通知接口;
如果处理失败,回调本地服务的失败通知接口;
整个流程基于部分异步和部分实时的模式,完整处理;
注意:如果本地服务多次请求第三方服务,需要根据流水号判断该请求的状态,业务的状态设计也是极其复杂,要根据流水号和状态追溯整个流程的执行进度,避免错乱。
2、流程实现案例
模拟基础接口
@RestController
public class ReqAsyncWeb {
private static final Logger LOGGER = LoggerFactory.getLogger(ReqAsyncWeb.class);
@Resource
private ReqAsyncService reqAsyncService ;
// 本地交易接口
@GetMapping("/tradeBegin")
public String tradeBegin (){
String sign = reqAsyncService.tradeBegin("TradeClient");
return sign ;
}
// 交易成功通知接口
@GetMapping("/tradeSucNotify")
public String tradeSucNotify (@RequestParam("param") String param){
LOGGER.info("tradeSucNotify param={"+ param +"}");
return "success" ;
}
// 交易失败通知接口
@GetMapping("/tradeFailNotify")
public String tradeFailNotify (@RequestParam("param") String param){
LOGGER.info("tradeFailNotify param={"+ param +"}");
return "success" ;
}
// 第三方交易接口
@GetMapping("/respTrade")
public String respTrade (@RequestParam("param") String param){
LOGGER.info("respTrade param={"+ param +"}");
reqAsyncService.respTrade(param);
return "NO20200520" ;
}
}
模拟第三方处理
@Service
public class ReqAsyncServiceImpl implements ReqAsyncService {
private static final String serverUrl = "http://localhost:8005" ;
@Override
public String tradeBegin(String param) {
String orderNo = HttpUtil.get(serverUrl+"/respTrade?param="+param);
if (StringUtils.isEmpty(orderNo)){
return "Trade..Fail...";
}
return orderNo ;
}
@Override
public void respTrade(String param) {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Thread thread01 = new Thread(
new RespTask(serverUrl+"/tradeSucNotify?param="+param),"SucNotify");
Thread thread02 = new Thread(
new RespTask(serverUrl+"/tradeFailNotify?param="+param),"FailNotify");
thread01.start();
thread02.start();
}
}
三、生产消费异步
1、流程描述
这里基于Kafka中间件,演示流程消息生成,消息处理的异步解耦流程,基本步骤:
消息生成之后,写入Kafka队列 ;
消息处理方获取消息后,进行流程处理;
消息在中间件提供的队列中持久化存储 ;
消息发起方如果挂掉,不影响消息处理 ;
消费方如果挂掉,不影响消息生成;
基于这种消息中间件模式,完成业务解耦,提高系统吞吐量,是架构中常用的方式。
2、流程实现案例
消息发送
@Service
public class KafkaAsyncServiceImpl implements KafkaAsyncService {
@Resource
private KafkaTemplate kafkaTemplate;
@Override
public void sendMsg(String msg) {
// 这里Topic如果不存在,会自动创建
kafkaTemplate.send("kafka-topic", msg);
}
}
消息消费
@Component
public class KafkaConsumer {
private static Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(topics = "kafka-topic")
public void listenMsg (ConsumerRecord,String> record) {
String value = record.value();
LOGGER.info("KafkaConsumer01 ==>>"+value);
}
}
注意:这里就算有多个消息消费方,也只会在一个消费方处理消息,这就是该模式的特点。
四、发布订阅异步
1、流程描述
这里基于Redis中间件,说明消息广播模式流程,基本步骤:
提供一个消息传递频道channel;
多个订阅频道的客户端client;
消息通过PUBLISH命令发送给频道channel ;
客户端就会收到频道中传递的消息 ;
之所以称为广播模式,该模式更注重通知下发,流程交互性不强。实际开发场景:运维总控系统,更新了某类服务配置,通知消息发送之后,相关业务线上的服务在拉取最新配置,更新到服务中。
2、流程实现案例
发送通知消息
@Service
public class RedisAsyncServiceImpl implements RedisAsyncService {
@Resource
private StringRedisTemplate stringRedisTemplate ;
@Override
public void sendMsg(String topic, String msg) {
stringRedisTemplate.convertAndSend(topic,msg);
}
}
客户端接收
@Service
public class ReceiverServiceImpl implements ReceiverService {
private static final Logger LOGGER = LoggerFactory.getLogger("ReceiverMsg");
@Override
public void receiverMsg(String msg) {
LOGGER.info("Receiver01 收到消息:msg-{}",msg);
}
}
配置广播模式
@Configuration
public class SubMsgConfig {
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory factory,
MessageListenerAdapter msgListenerAdapter,
MessageListenerAdapter msgListenerAdapter02){
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
//注册多个监听,订阅一个主题,实现消息广播
container.addMessageListener(msgListenerAdapter, new PatternTopic("topic:msg"));
container.addMessageListener(msgListenerAdapter02, new PatternTopic("topic:msg"));
return container;
}
@Bean
MessageListenerAdapter msgListenerAdapter(ReceiverService receiverService){
return new MessageListenerAdapter(receiverService, "receiverMsg");
}
@Bean
MessageListenerAdapter msgListenerAdapter02(ReceiverService02 receiverService02){
return new MessageListenerAdapter(receiverService02, "receiverMsg");
}
@Bean
ReceiverService receiverService(){
return new ReceiverServiceImpl();
}
@Bean
ReceiverService02 receiverService02(){
return new ReceiverServiceImpl02();
}
}
这里配置了多个订阅的客户端。
五、任务积压监控
生成一个消息,就因为有一个处理该消息的任务要执行,这就导致任务可能出现积压的情况,常见原因大致有如下几个:
任务产生的服务过多,任务处理的服务过少,不均衡;
任务处理时间太长,也导致生产过剩;
中间件本身容量偏小,需要扩容或集群化管理;
如果任务积压过多,可能要对任务生成进行流量控制,或者提升任务的处理能力,从而避免雪崩情况。
六、源代码地址
GitHub·地址
https://github.com/cicadasmile/data-manage-parent
GitEE·地址
https://gitee.com/cicadasmile/data-manage-parent
推荐阅读:《架构设计系列》,萝卜青菜,各有所需
-
Java 异步回调机制实例解析
2021-02-12 10:17:45什么是回调?今天傻傻地截了张图问了...所以在百度百科中是这样的:软件模块之间总是存在着一定的接口,从调用方式上,可以把他们分为三类:同步调用、回调和异步调用。回调是一种特殊的调用,至于三种方式也有点不...什么是回调?今天傻傻地截了张图问了下,然后被陈大牛回答道“就一个回调…”。此时千万个草泥马飞奔而过
哈哈,看着源码,享受着这种回调在代码上的作用,真是美哉。不妨总结总结。
一、什么是回调
回调,回调。要先有调用,才有调用者和被调用者之间的回调。所以在百度百科中是这样的:
软件模块之间总是存在着一定的接口,从调用方式上,可以把他们分为三类:同步调用、回调和异步调用。
回调是一种特殊的调用,至于三种方式也有点不同。
1、同步回调,即阻塞,单向。
2、回调,即双向(类似自行车的两个齿轮)。
3、异步调用,即通过异步消息进行通知。
二、CS中的异步回调(java案例)
比如这里模拟个场景:客户端发送msg给服务端,服务端处理后(5秒),回调给客户端,告知处理成功。代码如下:
回调接口类:
/**
* @author Jeff Lee
* @since 2015-10-21 21:34:21
* 回调模式-回调接口类
*/
public interface CSCallBack {
public void process(String status);
}
模拟客户端:
/**
* @author Jeff Lee
* @since 2015-10-21 21:25:14
* 回调模式-模拟客户端类
*/
public class Client implements CSCallBack {
private Server server;
public Client(Server server) {
this.server = server;
}
public void sendMsg(final String msg){
System.out.println("客户端:发送的消息为:" + msg);
new Thread(new Runnable() {
@Override
public void run() {
server.getClientMsg(Client.this,msg);
}
}).start();
System.out.println("客户端:异步发送成功");
}
@Override
public void process(String status) {
System.out.println("客户端:服务端回调状态为:" + status);
}
}
模拟服务端:
/**
* @author Jeff Lee
* @since 2015-10-21 21:24:15
* 回调模式-模拟服务端类
*/
public class Server {
public void getClientMsg(CSCallBack csCallBack , String msg) {
System.out.println("服务端:服务端接收到客户端发送的消息为:" + msg);
// 模拟服务端需要对数据处理
try {
Thread.sleep(5 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("服务端:数据处理成功,返回成功状态 200");
String status = "200";
csCallBack.process(status);
}
}
测试类:
/**
* @author Jeff Lee
* @since 2015-10-21 21:24:15
* 回调模式-测试类
*/
public class CallBackTest {
public static void main(String[] args) {
Server server = new Server();
Client client = new Client(server);
client.sendMsg("Server,Hello~");
}
}
运行下测试类 — 打印结果如下:
客户端:发送的消息为:Server,Hello~
客户端:异步发送成功
服务端:服务端接收到客户端发送的消息为:Server,Hello~
(这里模拟服务端对数据处理时间,等待5秒)
服务端:数据处理成功,返回成功状态 200
客户端:服务端回调状态为:200
一步一步分析下代码,核心总结如下
1、接口作为方法参数,其实际传入引用指向的是实现类
2、Client的sendMsg方法中,参数为final,因为要被内部类一个新的线程可以使用。这里就体现了异步。
3、调用server的getClientMsg(),参数传入了Client本身(对应第一点)。
还有值得一提的是
— 开源代码都在我的gitHub上哦~
三、回调的应用场景
回调目前运用在什么场景比较多呢?从操作系统到开发者调用:
1、Windows平台的消息机制
2、异步调用微信接口,根据微信返回状态对出业务逻辑响应。
3、Servlet中的Filter(过滤器)是基于回调函数,需容器支持。
补充:其中 Filter(过滤器)和Interceptor(拦截器)的区别,拦截器基于是Java的反射机制,和容器无关。但与回调机制有异曲同工之妙。
总之,这设计让底层代码调用高层定义(实现层)的子程序,增强了程序的灵活性。
四、模式对比
上面讲了Filter和Intercepter有着异曲同工之妙。其实接口回调机制和一种
观察者模式:
GOF说道 — “定义对象的一种一对多的依赖关系,当一个对象的状态发送改变的时候,所有对他依赖的对象都被通知到并更新。”它是一种模式,是通过接口回调的方法实现的,即它是一种回调的体现。
接口回调:
与观察者模式的区别是,它是种原理,而非具体实现。
五、心得
总结四步走:
机制,即是原理。
模式,即是体现。
记住具体场景,常见模式。
然后深入理解原理。
来源:51CTO
-
Android异步消息机制
2021-11-16 15:12:16异步消息机制 Message Message是在线程之间传递的消息,可以在内部携带少量信息。 成员: what, arg1, arg2 Handler 用于发送和处理消息。 成员方法: sendMessage() handleMessage() MessageQueue 消息队列,用于...异步消息机制
- Message
Message是在线程之间传递的消息,可以在内部携带少量信息。
成员:
what, arg1, arg2 - Handler
用于发送和处理消息。
成员方法:
sendMessage()
handleMessage() - MessageQueue
消息队列,用于存放所有通过Handler发送的消息。每个线程中只有一个MessageQueue对象。 - Looper
每个线程中的MessageQueue管家。调用Looper的loop()方法后,就会进入一个无限循环,然后每当发现MessageQueue中存在一条消息,就会把它取出,并传递到Handler的handleMessage()方法中。每个线程只有一个Looper对象。
异步消息处理机制示意图 - Message
-
rabbitmq异步消息确认机制
2022-01-25 22:08:47rabbitMq在实际发消息的时候会把 nextPublishSeqNo 存入到 unconfirmedSet 中 // com.rabbitmq.client.impl.ChannelN#basicPublish() public void basicPublish(String exchange, String routingKey, boolean ... -
异步处理(JAVA)
2012-01-10 12:01:47能同时并发处理多个请求,并能按一定机制调度: 用一个队列来存放请求,所以只能按FIFO机制调度,你可以改用LinkedList,就可以简单实现一个优先级(优先级高的addFirst,低的addLast). 三.有能力将调用的边界从线程扩展到... -
异步消息处理机制-Android中Handler原理(续)
2015-10-22 21:39:46异步消息处理线程是指线程启动后会进入一个无限循环,每循环一次,从内部的消息队列里面取出一个消息,并回调相应的消息处理函数。一般在任务常驻,比如用户交互任务的情况下使用异步消息处理线程。之前在Android中... -
Java异步实现的N种方式
2021-12-29 11:07:38目录 背景 Future 描述 样例 优缺点 ListenableFuture 描述 样例 优缺点 CallbackHell 描述 样例 CompleteableFuture ...计算结果完成时的处理 ...异步编程现在受到了越来越多的关注,尤其是在IO... -
java 真正的异步处理(future - listen机制)
2020-04-27 16:14:50java 真正的异步处理常规的异步处理真正的异步处理(futuer -> listen机制) 常规的异步处理 常规的异步处理,网上介绍的或者是大家用的一般都是FutureTask+Callable,这是jUC中提出的内容。直接上代码来看: ... -
Android异步消息处理机制详解及源码分析
2015-05-28 09:20:12怎么样,这和你平时用的Handler一样吧,对于Handler异步处理的简单基础示例先说到这,接下来依据上面示例的写法分析原因与源代码原理。 3 分析Android 5.1.1(API 22)异步消息机制源码 3-1 看看Handler的... -
使用SpringJMS轻松实现异步消息传递
2021-02-04 09:42:36北京火龙果软件工程技术中心异步进程通信是面向...它提供的模板机制隐藏了典型的JMS实现的细节,这样开发人员可以集中精力放在处理消息的实际工作中,而不用担心如何去创建,访问或清除JMS资源。本文将对SpringJMSAP -
Android异步消息机制从入门到精通
2022-03-12 14:12:00一、什么是Android异步消息机制? 二、异步消息机制入门 三、源码解析 1.Message 1.1消息内容 1.2.处理消息 1.3.缓存机制 1.4.小结 2.Handler和Looper 2.1.Handler里的Looper 2.2.Handler发送消息 2.3.... -
Java实现异步的四种方式
2021-06-07 16:09:07目录1、开线程2、Future3、CompletableFuture4、Async注解 1、开线程 /** * 开线程 */ public class ThreadTest { public void test() { ... //做处理 try { Thread.sleep(1000); } catch (InterruptedExcep -
JAVA实现async/await异步处理等待机制
2020-04-13 10:08:33搜遍了百度都没看到JAVA有实现async/await的方案,而js 、.net 、scala都有await,心里感觉就特别不爽,为什么JAVA没有呢,实现async/await很难吗,参考了scala的实现方式,感觉看到了一点希望,觉得java也是可以像... -
Android服务(包括多线程和异步消息处理)
2017-09-17 12:06:12多线程与Java一样两种方式:1.继承Thread 2.实现Runnable接口 由于继承Thread方式耦合性比较高,因此我们会使用实现接口的方式来编写Android的多线程方式。Android与Java多线程不同点:1.Android中的更新UI必须在... -
异步http请求处理机制
2021-12-23 10:21:55http 异步 请求 get post -
Java 异步消息处理
2013-10-15 17:23:17在前一节实现异步调用的基础上 , 现在我们来看一下一个完善的 Java 异步消息处理机制 . [ 写在本节之前 ] 在所有这些地方 , 我始终没有提到设计模式这个词 , 而事实上 , 多线程编程几乎每一步都在... -
Android多线程及异步任务消息处理机制(一)--Handler的使用
2014-08-26 12:39:16我们知道,不管在任何的语言或操作系统平台(Android系统也不例外),多线程、多进程和异步任务的相关技术地讨论都是永恒的话题,很多的开发需求都需要使用多线程及异步任务以便实现多任务的同时执行和灵活的用户... -
//代码片16 public void dispatchMessage(Message msg) { if (msg.callback != null) { handleCallback(msg); } else { if (mCallback != null) { if (mCallback.handleMessage(msg)) { ... }