精华内容
下载资源
问答
  • kafka 延时消息处理

    千次阅读 2020-08-10 01:45:04
    作为一款优秀的消息处理服务,kafka 具有完善的事务管理,状态管理和灾难恢复功能。只要我们稍加变通一下,kafka 也能作为延迟消息处理的解决方案,而且实现上比用数据库简单得多。 以下代码均在 sp...

        你一定遇到过这种情况,接收到消息时并不符合马上处理的条件(例如频率限制),但是又不能丢掉,于是先存起来,过一阵子再来处理。系统应该怎么设计呢?可能你会想到数据库,用一个字段来标记执行的状态,或者设置一个等待的时间戳,不管是哪种都需要反复地从数据库存取,还要考虑出异常情况状态的维护。

        作为一款优秀的消息处理服务,kafka 具有完善的事务管理,状态管理和灾难恢复功能。只要我们稍加变通一下,kafka 也能作为延迟消息处理的解决方案,而且实现上比用数据库简单得多。

        以下代码均在 spring-boot 2.0.5 和 spring-kafka 2.1.10 中测试通过。建议事先阅读文档 https://docs.spring.io/spring-kafka/docs/2.5.4.RELEASE/reference/html/#receiving-messages 以便能很好地理解以下内容。

    设计思路

        设计 2 个队列(topic),一个收到消息马上执行,另一个用来接收需延迟处理的消息。话句话说,接收延迟消息的队列直到消息可执行之前一直在 block 状态,所以有局限性,定时不能非常精确,并且任务执行次序与加进来的次序是一致的。
     

    spring-boot 的配置

    application.yml
    ————————————————————
    
    spring:
      ## kafka
      kafka:
        bootstrap-servers: 127.0.0.1:9092
        consumer:
          group-id: myGroup
          auto-offset-reset: earliest
          enable-auto-commit: false
          properties:
            max:
              poll:
                interval:
                  # 设置时间必须比延迟处理的时间大,不然会报错
                  ms: 1200000
        listener:
          # 把提交模式改为手动
          ack-mode: MANUAL
     
     
    kafka 默认的消费模式是自动提交,意思是,当 MessageListener 收到消息,执行处理方法后自动提交已完成状态,该消息就从队列里移除了。配置 ack-mode: MANUAL 改为手动提交后,我们就可以根据需要保留数据在消息队列,以便以后再处理。
    max.poll.interval.ms 设小了可能会收到下面的错误:
    org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
     
    请务必设置一个比等待执行时间更长的时间。
     

    发送消息

    @Autowired
    private KafkaTemplate kafkaTemplate;
    
    public void myAction(){
        // 定义 data
        // 任务推送到 Kafka
        kafkaTemplate.send(“myJob", data.toString());
    }

    该部分没有特别的地方,跟普通的消息消息发送一样。

    接收消息

    定义两个 topic:myJob 和 myJob-delay
    @SpringBootApplication
    @ServletComponentScan
    public class Application {
        
        @KafkaListener(topics = “myJob”)
        @SendTo(“myJob-delay")
        public String onMessage(ConsumerRecord<?, ?> cr, Acknowledgment ack) {
            String json = (String) cr.value();
            JSONObject data = JSON.parseObject(json);
    
            if (/* 需要延迟处理 */){
                // 提交
                ack.acknowledge();
                // 发送到 @SendTo
               data.put("until", System.currentTimeMillis() + msToDelay);
               return data.toString();
            }
    
            // 正常处理
            // do real work
    
            // 提交
            ack.acknowledge();
            return null;
        }
    
        @KafkaListener(topics = “myJob-delay")
        @SendTo(“myJob")
        public String delayMessage(ConsumerRecord<?, ?> cr, Acknowledgment ack){
            
            String json = (String) cr.value();
            JSONObject data = JSON.parseObject(json);
            Long until = data.getLong("until");
            // 阻塞直到 until
            while (System.currentTimeMillis() < until){
               Thread.sleep( Math.max(0, until - System.currentTimeMillis()) );
            }
    
            // 提交
            ack.acknowledge();
            // 转移到 @SendTo
            return json;
    
        }
    }

    代码很简单,不用解释也能看明白。稍微提一下几个重要的地方。

    @KafkaListener 的方法参数里有 Acknowledgment ack,这是AckMode.MANUAL 模式下必须要添加的参数。

    ack.acknowledge() 用来标记一条消息已经消费完成,即将从消息队列里移除。执行之前消息会一直保留在队列中,即时宕机重启后也能恢复。

    @SendTo 用来在队列(topic)间转移消息,只要 return 非 null 的数据。以上代码中,当需要延迟处理时,消息从 myJob 转移到 myJob-delay;而当条件满足时,消息又从 myJob-delay 转移到了 myJob。

    自从 spring-kafka 2.2.4 版本之后,可以在方法上定义 max.poll.interval.ms ,更加灵活了。例如

    @KafkaListener(topics = "myTopic", groupId = "group", properties = { 
        "max.poll.interval.ms:60000”, 
        ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100”}
    )

        以上是延迟消息处理的简单实现,适合延时要求不那么高的场合。朋友们想一下,假如延时比较复杂,执行的次序也不一定跟消息到达的次序一致,系统又该怎样设计呢?

    假如这篇文章对你有所帮助, 请关注我公众号, 发现更多有用的文章

     

     
     
    展开全文
  • Android应用程序消息处理机制

    千次下载 热门讨论 2013-10-23 01:22:30
    Android应用程序与传统的PC应用程序一样,都是消息驱动的。也就是说,在Android应用程序主线程中,所有函数都是在一个...掌握Android应用程序消息处理机制,有助于我们熟练地使用同步和异步编程,提高程序的运行性能
  • 4.4.4 消费者的消息处理语义

    千次阅读 2021-04-19 15:06:51
    4.4.4 消费者的消息处理语义 消费者从消息代理节点拉取到分区的消息后,对一条消息的处理语义有下面3种情况。 至多一次。消息最多被处理一次,可能会丢失,但绝不会重复传输。 至少一次。消息至少被处理一次,不...

    4.4.4 消费者的消息处理语义

    消费者从消息代理节点拉取到分区的消息后,对一条消息的处理语义有下面3种情况。

    • 至多一次。消息最多被处理一次,可能会丢失,但绝不会重复传输。
    • 至少一次。消息至少被处理一次,不可能丢失,但可能会重复传输。
    • 正好一次。消息正好被处理一次,不可能丢失,也不可能重复传输。

    消费者重新加入消费组会分配到新的分区,为了保证消费者能从分配分区的最近提交位置重新开始拉取井消费消息,消费者可以通过手动方式或定时任务提交分区的偏移量,保存分区的消费进度。消费者处理消息并更新消费进度,有下面几种不同的消息处理语义。

    1. 至多一次
      消费者读取消息,先保存消费进度,然后才处理消息。这样有可能会出现:消费者保存完消费进度,但在处理消息之前挂了。新的消费者会从保存的位置开始,但实际上在这个位置之前的消息可能并没有被真正处理。这种场景对应了“至多一次”的语义,即消息、有可能丢失(没有被处理)。Kafka消费者实现“至多一次”的做法是:设置消费者自动提交偏移量,并且设置较短的提交时间间隔。相关代码如下:
      在这里插入图片描述

    如图4-39所示,假设消费者l要处理3条消息,它们的偏移量分别是[1,2,3)。如果先提交偏移盐3,然后才开始处理,但消费者l只处理了第一条消息后就失败了,新的消费者2会从消费者l记录的偏移量3开始处理,导致没有处理第二条消息和第三条消息,即消息丢失了。
    在这里插入图片描述

    1. 至少一次

    消费者读取消息,先处理消息,最后才保存消费进度。这样有可能会出现:消费者处理完消息,但是在保存消费进度之前挂了。新的消费者从保存的位置开始,有可能会重新处理上一个消费者已经处理过的消息。这种场景对应了“至少一次”的语义,即消息有可能会被重复处理。Kafka消费者实现至少一次的做法是:设置消费者向动提交偏移量,但设置很长的提交间隔(或者关闭向动提交偏移盘)。在处理完消息后,手动调用同步模式的提交偏移量方法。相关代码如下:

    在这里插入图片描述

    如图4-40所示,消费者先处理消息,然后才提交偏移量。假设消费者l处理完消息6,但是在提交偏移韭6时失败了,这时偏移量仍然是上一次记录的偏移盘3。消费者2会从偏移fil3开始处理,就会重复处理第三条消息之后的第41516条消息,即消息被重复处理了。
    在这里插入图片描述

    1. 正好一次

    实现正好一次的消息处理语义有两种典型的解决方案:在保存消费进度和保存消费结果之间,引人两阶段提交协议;或者让消费者将消费进度和处理结果保存在同一个存储介质中。比如,将读取的数据和偏移盘一起存储到HDFS,确保数据和偏移量要么一起被更新,要么都不会更新。Kafka消费者实现正好一次的做法是:设置消费者不自动提交偏移量,订阅主题时设置自定义的消费者再平衡监昕器(ConsumerRebalanceli.stener)。相关代码如下:
    在这里插入图片描述

    消费者再平衡监昕器会在分区发生变化时,分别从外部存储系统写入或读取偏移量。只要能保证消费者处理消息的流程和写入偏移量到存储系统是一个原子操作,就可以实现正好一次的消息处理语义。相关代码如下:

    在这里插入图片描述

    注意:处理消息和保存偏移量必须是一个原子操作。如果不是原子操作,处理消息和保存偏移量之间发生失败,就又回到“至少一次”的场景。这里的关键还是原子操作,而不是说使用消费者再平衡监听器(ConsufTlerRebalanceli.stener)就可以实现“正好一次”的处理语义。监听器会在分区发生变化时读取或写入外部的偏移量存储,这个外部存储实际上l!tl协调节点没有太大区别。下一章分析协调者时,会详细分析“消费者再平衡监听器”的作用和执行流程。


    4.5 小结

    本章主要分析消费者新API的拉取消息流程,消费者消费消息主要和KafkaConsufTler类进行交互。客户端通过subscri.be()方法订阅指定的主题,然后调用poll()方法轮询。轮询主要分成3个步骤:通过拉取器发送拉取请求、通过消费者的网络客户端轮询、从拉取器中获取拉撒结果。下面列举了消费者消费消息以及发生再平衡操作时的具体步骤。

    (1)消费者分配到分区,订阅状态中的分区状态,初始时还没有拉取偏移量。
    (2)客户端轮询为没有拉取偏移茸的分区更新位置,会尝试从服务端协调节点读取分区的提交偏移量。
    (3)由于此时没有记录分区的提交偏移量,只能按照客户端设置的重置策略定位到最早或最近的位置。
    (4)消费者根据分区的拉取偏移量,从分区的主副本节点拉取消息,井更新分区状态的拉取偏移量。
    (5)分区有了拉取偏移量,自动提交偏移量的定时任务开始工作。
    (6)定时提交任务会将分区状态最新的拉取偏移量提交到服务端。
    (7)如果分区所有权没有变化,下次拉取消息时,已经存在拉取偏移量的分区不需要更新位置。
    (9)如果分区所有权发生变化,协调者会将分区重新分配给新的消费者。
    (10)新消费者之前没有分配该分区,会从服务端读取其他消费者之前提交的分区偏移量。
    (11)新消费者从分区最近的提交偏移量拉取数据,而且它的定时任务也会提交偏移盘到服务端。
    (12)协调者确保分区一定会分配给消费者,这样让分区一定会被消费者拉取并被消费。
    

    客户端在发送请求后返回一个异步请求对象,表示客户端会在未来的某个时刻收到服务端返回的响应结果。客户端可以在返回的异步请求上添加一个监听器,当异步请求完成时,就会自动触发监昕器的回调方法。异步请求还有其他高级的用法,比如组合模式、链接模式。组合模式返回的是一个新的异步请求,也可以在这个新异步请求上再添加一个监听器,形成组合加监昕器模式。

    使用异步请求的步骤有3步:调用发送请求返回异步请求、客户端轮询、获取异步请求的结果。客户端轮询有3种方式。

    • 快速轮询,调用该方法后会立即返回到主线程,这是无阻塞的轮询。
    • 带超时时间的轮询,如果在给定时间内没有结果返回,会返回到主线程,这是阻塞的轮询。
    • 没有时间限制的轮询,只有在异步请求完成后才会返回到主线程,这是阻塞的轮询。

    客户端发送请求得到的异步请求,它的泛型类型是客户端响应(Cl1.entResponse)。使用“组合加适配器”模式后,可以将客户端响应转换为自定义的类型。比如获取分区的偏移量(LIST_OFFSETS)返回的异步请求对象是RequestFuture<Long>,获取分区的提交偏移量(OFFSET_FETCH)对应类型是Map<Top’i.cPart’i.t’i.on,OffsetAndMetadata>,加入消费组对应类型是ByteBuffer,心跳和自动提交任务对应类型是RequestFuture<Vo’i.d>。下面列举了使用异步请求的3种做法,最后都要获取异步请求的结果,用于回调处理。

    第一种使用方式:客户端使用组合模式发送请求,返回异步请求对象后,立即调用client.poll(future)进行阻塞式地轮询操作。客户端只有在异步请求完成的时候,才可以获取异步请求的结果。相关代码如下:

    在这里插入图片描述

    第二种使用方式:为客户端发送请求返回的异步请求对象添加一个监听器,然后才开始阻塞式的轮询。异步请求监昕器囚调方法的参数是客户端响应对象,当客户端收到服务端的响应结果后,会先将客户端的响应结果设置为异步请求对象的结果值然后调用监昕器的回调方法。异步请求监听器回调方法的参数Cl’i.entResponse实际上就是异步请求对象的结果,它和future.value()数据一样。相关代码如下:

    在这里插入图片描述

    第三种使用方式:客户端使用组合发送请求,并给返回的异步请求对象添加一个监听器,然后开始阻塞式的轮询。使用组合模式发送请求和第一种方式一样,给异步请求对象添加监听器和第二种模式一样,所以第三种实际上结合了两种模式,它和第二种模式的区别是异步请求的类型为ByteBuffer,而不是客户端响应。第三种方式的执行步骤和第二种一样,最后都会调用异步请求监昕器的回调。相关代码如下:

    在这里插入图片描述

    如图4-41所示,以组合模式的异步请求为例,客户端发送请求并获取响应结果的具体步骤如下。

    (1)客户端调用sendRequest()向服务端节点发送请求。
    (2)客户端不需要等待服务端返回结果,返回异步请求。
    (3)在步骤(2)返回的异步请求上添加一个监昕器。
    (4)客户端在异步请求上轮询,会阻塞式地等待请求完成。
    (5)如果异步请求没有完成,则继续轮询。
    (6)当收到服务端返回的响应结果后,调用handleResponse()回调方法。
    (7)在回调方法中会解析出客户端响应,调用异步请求的complete()方法,完成异步请求。
    (8)从客户端响应对象解析出来的数据,会被设置为异步请求的结果值。
    (9)客户端调用异步请求的value()方法,获取异步请求的结果。
    

    在这里插入图片描述

    图4-42总结Kafka的消费者和其他组件的关系。Kafka消费者主要有拉取器、消费者的协调者两个主要的类。拉取器会向服务端拉取消息,消费者的协调者会发送心跳和提交偏移量给服务端的协调者节点。属于同一个消费组的所有消费者涉及消费组相关的请求,都会和服务端的协调者节点通信。

    在这里插入图片描述

    本章主要分析的是一个消费者的处理流程,并没有和其他消费者联系起来,以全局的视角来解
    消费组整体的行为。消费者的大部分工作都要和服务端的协调者通信,服务端的协调者会管理同一个消费组的所有消费者。下一章会分析消费者发送请求后,在服务端的协调者上的处理流程。

    展开全文
  • 我们都知道,Android UI是线程不安全的,如果在子线程中尝试进行UI操作,...这种处理方式被称为异步消息处理线程,虽然我相信大家都会用,可是你知道它背后的原理是什么样的吗?今天我们就来一起深入探究一下Handler和

    转载请注明出处:http://blog.csdn.net/guolin_blog/article/details/9991569


    之前也是由于周末通宵看TI3比赛,一直没找到时间写博客,导致已经有好久没更新了。惭愧!后面还会恢复进度,尽量保证每周都写吧。这里也是先恭喜一下来自瑞典的Alliance战队夺得了TI3的冠军,希望明年中国战队能够虎起!


    开始进入正题,我们都知道,Android UI是线程不安全的,如果在子线程中尝试进行UI操作,程序就有可能会崩溃。相信大家在日常的工作当中都会经常遇到这个问题,解决的方案应该也是早已烂熟于心,即创建一个Message对象,然后借助Handler发送出去,之后在Handler的handleMessage()方法中获得刚才发送的Message对象,然后在这里进行UI操作就不会再出现崩溃了。


    这种处理方式被称为异步消息处理线程,虽然我相信大家都会用,可是你知道它背后的原理是什么样的吗?今天我们就来一起深入探究一下Handler和Message背后的秘密。


    首先来看一下如何创建Handler对象。你可能会觉得挺纳闷的,创建Handler有什么好看的呢,直接new一下不就行了?确实,不过即使只是简单new一下,还是有不少地方需要注意的,我们尝试在程序中创建两个Handler对象,一个在主线程中创建,一个在子线程中创建,代码如下所示:

    public class MainActivity extends Activity {
    	
    	private Handler handler1;
    	
    	private Handler handler2;
    
    	@Override
    	protected void onCreate(Bundle savedInstanceState) {
    		super.onCreate(savedInstanceState);
    		setContentView(R.layout.activity_main);
    		handler1 = new Handler();
    		new Thread(new Runnable() {
    			@Override
    			public void run() {
    				handler2 = new Handler();
    			}
    		}).start();
    	}
    
    }
    如果现在运行一下程序,你会发现,在子线程中创建的Handler是会导致程序崩溃的,提示的错误信息为 Can't create handler inside thread that has not called Looper.prepare() 。说是不能在没有调用Looper.prepare() 的线程中创建Handler,那我们尝试在子线程中先调用一下Looper.prepare()呢,代码如下所示:
    new Thread(new Runnable() {
    	@Override
    	public void run() {
    		Looper.prepare();
    		handler2 = new Handler();
    	}
    }).start();
    果然这样就不会崩溃了,不过只满足于此显然是不够的,我们来看下Handler的源码,搞清楚为什么不调用Looper.prepare()就不行呢。Handler的无参构造函数如下所示:
    public Handler() {
        if (FIND_POTENTIAL_LEAKS) {
            final Class<? extends Handler> klass = getClass();
            if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) &&
                    (klass.getModifiers() & Modifier.STATIC) == 0) {
                Log.w(TAG, "The following Handler class should be static or leaks might occur: " +
                    klass.getCanonicalName());
            }
        }
        mLooper = Looper.myLooper();
        if (mLooper == null) {
            throw new RuntimeException(
                "Can't create handler inside thread that has not called Looper.prepare()");
        }
        mQueue = mLooper.mQueue;
        mCallback = null;
    }
    可以看到,在第10行调用了Looper.myLooper()方法获取了一个Looper对象,如果Looper对象为空,则会抛出一个运行时异常,提示的错误正是 Can't create handler inside thread that has not called Looper.prepare()!那什么时候Looper对象才可能为空呢?这就要看看Looper.myLooper()中的代码了,如下所示:
    public static final Looper myLooper() {
        return (Looper)sThreadLocal.get();
    }
    这个方法非常简单,就是从sThreadLocal对象中取出Looper。如果sThreadLocal中有Looper存在就返回Looper,如果没有Looper存在自然就返回空了。因此你可以想象得到是在哪里给sThreadLocal设置Looper了吧,当然是Looper.prepare()方法!我们来看下它的源码:
    public static final void prepare() {
        if (sThreadLocal.get() != null) {
            throw new RuntimeException("Only one Looper may be created per thread");
        }
        sThreadLocal.set(new Looper());
    }

    可以看到,首先判断sThreadLocal中是否已经存在Looper了,如果还没有则创建一个新的Looper设置进去。这样也就完全解释了为什么我们要先调用Looper.prepare()方法,才能创建Handler对象。同时也可以看出每个线程中最多只会有一个Looper对象。


    咦?不对呀!主线程中的Handler也没有调用Looper.prepare()方法,为什么就没有崩溃呢?细心的朋友我相信都已经发现了这一点,这是由于在程序启动的时候,系统已经帮我们自动调用了Looper.prepare()方法。查看ActivityThread中的main()方法,代码如下所示:

    public static void main(String[] args) {
        SamplingProfilerIntegration.start();
        CloseGuard.setEnabled(false);
        Environment.initForCurrentUser();
        EventLogger.setReporter(new EventLoggingReporter());
        Process.setArgV0("<pre-initialized>");
        Looper.prepareMainLooper();
        ActivityThread thread = new ActivityThread();
        thread.attach(false);
        if (sMainThreadHandler == null) {
            sMainThreadHandler = thread.getHandler();
        }
        AsyncTask.init();
        if (false) {
            Looper.myLooper().setMessageLogging(new LogPrinter(Log.DEBUG, "ActivityThread"));
        }
        Looper.loop();
        throw new RuntimeException("Main thread loop unexpectedly exited");
    }
    可以看到,在第7行调用了Looper.prepareMainLooper()方法,而这个方法又会再去调用Looper.prepare()方法,代码如下所示:
    public static final void prepareMainLooper() {
        prepare();
        setMainLooper(myLooper());
        if (Process.supportsProcesses()) {
            myLooper().mQueue.mQuitAllowed = false;
        }
    }

    因此我们应用程序的主线程中会始终存在一个Looper对象,从而不需要再手动去调用Looper.prepare()方法了。


    这样基本就将Handler的创建过程完全搞明白了,总结一下就是在主线程中可以直接创建Handler对象,而在子线程中需要先调用Looper.prepare()才能创建Handler对象。


    看完了如何创建Handler之后,接下来我们看一下如何发送消息,这个流程相信大家也已经非常熟悉了,new出一个Message对象,然后可以使用setData()方法或arg参数等方式为消息携带一些数据,再借助Handler将消息发送出去就可以了,示例代码如下:

    new Thread(new Runnable() {
    	@Override
    	public void run() {
    		Message message = new Message();
    		message.arg1 = 1;
    		Bundle bundle = new Bundle();
    		bundle.putString("data", "data");
    		message.setData(bundle);
    		handler.sendMessage(message);
    	}
    }).start();

    可是这里Handler到底是把Message发送到哪里去了呢?为什么之后又可以在Handler的handleMessage()方法中重新得到这条Message呢?看来又需要通过阅读源码才能解除我们心中的疑惑了,Handler中提供了很多个发送消息的方法,其中除了sendMessageAtFrontOfQueue()方法之外,其它的发送消息方法最终都会辗转调用到sendMessageAtTime()方法中,这个方法的源码如下所示:

    public boolean sendMessageAtTime(Message msg, long uptimeMillis)
    {
        boolean sent = false;
        MessageQueue queue = mQueue;
        if (queue != null) {
            msg.target = this;
            sent = queue.enqueueMessage(msg, uptimeMillis);
        }
        else {
            RuntimeException e = new RuntimeException(
                this + " sendMessageAtTime() called with no mQueue");
            Log.w("Looper", e.getMessage(), e);
        }
        return sent;
    }

    sendMessageAtTime()方法接收两个参数,其中msg参数就是我们发送的Message对象,而uptimeMillis参数则表示发送消息的时间,它的值等于自系统开机到当前时间的毫秒数再加上延迟时间,如果你调用的不是sendMessageDelayed()方法,延迟时间就为0,然后将这两个参数都传递到MessageQueue的enqueueMessage()方法中。这个MessageQueue又是什么东西呢?其实从名字上就可以看出了,它是一个消息队列,用于将所有收到的消息以队列的形式进行排列,并提供入队和出队的方法。这个类是在Looper的构造函数中创建的,因此一个Looper也就对应了一个MessageQueue。


    那么enqueueMessage()方法毫无疑问就是入队的方法了,我们来看下这个方法的源码:

    final boolean enqueueMessage(Message msg, long when) {
        if (msg.when != 0) {
            throw new AndroidRuntimeException(msg + " This message is already in use.");
        }
        if (msg.target == null && !mQuitAllowed) {
            throw new RuntimeException("Main thread not allowed to quit");
        }
        synchronized (this) {
            if (mQuiting) {
                RuntimeException e = new RuntimeException(msg.target + " sending message to a Handler on a dead thread");
                Log.w("MessageQueue", e.getMessage(), e);
                return false;
            } else if (msg.target == null) {
                mQuiting = true;
            }
            msg.when = when;
            Message p = mMessages;
            if (p == null || when == 0 || when < p.when) {
                msg.next = p;
                mMessages = msg;
                this.notify();
            } else {
                Message prev = null;
                while (p != null && p.when <= when) {
                    prev = p;
                    p = p.next;
                }
                msg.next = prev.next;
                prev.next = msg;
                this.notify();
            }
        }
        return true;
    }

    首先你要知道,MessageQueue并没有使用一个集合把所有的消息都保存起来,它只使用了一个mMessages对象表示当前待处理的消息。然后观察上面的代码的16~31行我们就可以看出,所谓的入队其实就是将所有的消息按时间来进行排序,这个时间当然就是我们刚才介绍的uptimeMillis参数。具体的操作方法就根据时间的顺序调用msg.next,从而为每一个消息指定它的下一个消息是什么。当然如果你是通过sendMessageAtFrontOfQueue()方法来发送消息的,它也会调用enqueueMessage()来让消息入队,只不过时间为0,这时会把mMessages赋值为新入队的这条消息,然后将这条消息的next指定为刚才的mMessages,这样也就完成了添加消息到队列头部的操作。


    现在入队操作我们就已经看明白了,那出队操作是在哪里进行的呢?这个就需要看一看Looper.loop()方法的源码了,如下所示:
    public static final void loop() {
        Looper me = myLooper();
        MessageQueue queue = me.mQueue;
        while (true) {
            Message msg = queue.next(); // might block
            if (msg != null) {
                if (msg.target == null) {
                    return;
                }
                if (me.mLogging!= null) me.mLogging.println(
                        ">>>>> Dispatching to " + msg.target + " "
                        + msg.callback + ": " + msg.what
                        );
                msg.target.dispatchMessage(msg);
                if (me.mLogging!= null) me.mLogging.println(
                        "<<<<< Finished to    " + msg.target + " "
                        + msg.callback);
                msg.recycle();
            }
        }
    }
    可以看到,这个方法从第4行开始,进入了一个死循环,然后不断地调用的MessageQueue的next()方法,我想你已经猜到了,这个next()方法就是消息队列的出队方法。不过由于这个方法的代码稍微有点长,我就不贴出来了,它的简单逻辑就是如果当前MessageQueue中存在mMessages(即待处理消息),就将这个消息出队,然后让下一条消息成为mMessages,否则就进入一个阻塞状态,一直等到有新的消息入队。继续看loop()方法的第14行,每当有一个消息出队,就将它传递到msg.target的dispatchMessage()方法中,那这里msg.target又是什么呢?其实就是Handler啦,你观察一下上面sendMessageAtTime()方法的第6行就可以看出来了。接下来当然就要看一看Handler中dispatchMessage()方法的源码了,如下所示:
    public void dispatchMessage(Message msg) {
        if (msg.callback != null) {
            handleCallback(msg);
        } else {
            if (mCallback != null) {
                if (mCallback.handleMessage(msg)) {
                    return;
                }
            }
            handleMessage(msg);
        }
    }
    在第5行进行判断,如果mCallback不为空,则调用mCallback的handleMessage()方法,否则直接调用Handler的handleMessage()方法,并将消息对象作为参数传递过去。这样我相信大家就都明白了为什么handleMessage()方法中可以获取到之前发送的消息了吧!


    因此,一个最标准的异步消息处理线程的写法应该是这样:

    class LooperThread extends Thread {
          public Handler mHandler;
    
          public void run() {
              Looper.prepare();
    
              mHandler = new Handler() {
                  public void handleMessage(Message msg) {
                      // process incoming messages here
                  }
              };
    
              Looper.loop();
          }
      }

    当然,这段代码是从Android官方文档上复制的,不过大家现在再来看这段代码,是不是理解的更加深刻了?


    那么我们还是要来继续分析一下,为什么使用异步消息处理的方式就可以对UI进行操作了呢?这是由于Handler总是依附于创建时所在的线程,比如我们的Handler是在主线程中创建的,而在子线程中又无法直接对UI进行操作,于是我们就通过一系列的发送消息、入队、出队等环节,最后调用到了Handler的handleMessage()方法中,这时的handleMessage()方法已经是在主线程中运行的,因而我们当然可以在这里进行UI操作了。整个异步消息处理流程的示意图如下图所示:




    另外除了发送消息之外,我们还有以下几种方法可以在子线程中进行UI操作:


    1. Handler的post()方法

    2. View的post()方法

    3. Activity的runOnUiThread()方法


    我们先来看下Handler中的post()方法,代码如下所示:

    public final boolean post(Runnable r)
    {
       return  sendMessageDelayed(getPostMessage(r), 0);
    }
    原来这里还是调用了sendMessageDelayed()方法去发送一条消息啊,并且还使用了getPostMessage()方法将Runnable对象转换成了一条消息,我们来看下这个方法的源码:
    private final Message getPostMessage(Runnable r) {
        Message m = Message.obtain();
        m.callback = r;
        return m;
    }
    在这个方法中将消息的callback字段的值指定为传入的Runnable对象。咦?这个callback字段看起来有些眼熟啊,喔!在Handler的dispatchMessage()方法中原来有做一个检查,如果Message的callback等于null才会去调用handleMessage()方法,否则就调用handleCallback()方法。那我们快来看下handleCallback()方法中的代码吧:
    private final void handleCallback(Message message) {
        message.callback.run();
    }
    也太简单了!竟然就是直接调用了一开始传入的Runnable对象的run()方法。因此在子线程中通过Handler的post()方法进行UI操作就可以这么写:
    public class MainActivity extends Activity {
    
    	private Handler handler;
    
    	@Override
    	protected void onCreate(Bundle savedInstanceState) {
    		super.onCreate(savedInstanceState);
    		setContentView(R.layout.activity_main);
    		handler = new Handler();
    		new Thread(new Runnable() {
    			@Override
    			public void run() {
    				handler.post(new Runnable() {
    					@Override
    					public void run() {
    						// 在这里进行UI操作
    					}
    				});
    			}
    		}).start();
    	}
    }
    虽然写法上相差很多,但是原理是完全一样的,我们在Runnable对象的run()方法里更新UI,效果完全等同于在handleMessage()方法中更新UI。


    然后再来看一下View中的post()方法,代码如下所示:

    public boolean post(Runnable action) {
        Handler handler;
        if (mAttachInfo != null) {
            handler = mAttachInfo.mHandler;
        } else {
            ViewRoot.getRunQueue().post(action);
            return true;
        }
        return handler.post(action);
    }
    原来就是调用了Handler中的post()方法,我相信已经没有什么必要再做解释了。


    最后再来看一下Activity中的runOnUiThread()方法,代码如下所示:

    public final void runOnUiThread(Runnable action) {
        if (Thread.currentThread() != mUiThread) {
            mHandler.post(action);
        } else {
            action.run();
        }
    }
    如果当前的线程不等于UI线程(主线程),就去调用Handler的post()方法,否则就直接调用Runnable对象的run()方法。还有什么会比这更清晰明了的吗?


    通过以上所有源码的分析,我们已经发现了,不管是使用哪种方法在子线程中更新UI,其实背后的原理都是相同的,必须都要借助异步消息处理的机制来实现,而我们又已经将这个机制的流程完全搞明白了,真是一件一本万利的事情啊。


    关注我的技术公众号,每天都有优质技术文章推送。关注我的娱乐公众号,工作、学习累了的时候放松一下自己。

    微信扫一扫下方二维码即可关注:

            

    展开全文
  • Android WebKit消息处理

    千次阅读 2014-02-11 00:42:21
    Android WebKit的消息处理

                                    Android WebKit消息处理


    前言

    在Android API中,用WebView的形式向开发者提供WebKit的接口以及特性。WebView其实是对WebKit的封装以及扩展,在android4.4里面,已经WebKit已经换成Chromium(在后续博客中会对android4.4 的WebView/Chromium进行详细讲解)。

    WebKit消息处理框架的搭建

    整个WebKit主要分为2个线程,一个是Ui线程,也就是应用程序使用WebView所在的主线程,另一个WebCore线程。WebView的消息处理,主要是Ui线程和WebCore线程的交互。一部分Ui线程向WebCore发送的命令操作消息,例如LOAD_URL,另一部分是来自Ui的touch消息。

    主要涉及到的类如图所示:


    其中,WebViewClassic是WebView的Provider,或者说是delegate,凡是涉及到消息处理或者需要跟WebCore交互的接口,都是直接调用WebViewClassic的同名函数。

    WebViewInputDispatcher就是用来处理Ui的touch事件的。后面会专门讲解WebKit的touch事件传递以及处理流程的。本章就不展开讲解了。

    WebView的初始化

    时序图如下:


    上图其实就是WebView/WebKit的初始化流程,主要是对WebViewClassic, WebCore, WebViewInputDispatcher进行初始化,为WebKit资源请求前做准备。WebView构造函数中会调用createWebView,这个过程其实就是创建WebView的provider,实际返回的对象就是WebViewClassic,紧接着init初始化WebViewClassic,在WebViewClassic的init中会new WebViewCore(),之后对WebViewCore进行初始化,在WebViewCore自己初始化完毕之后,表明现在WebCore已经可以处理来自WebView的消息了,包括touch事件,此时WebViewCore会发送消息给WebViewClassic,也就是上图中看到的WEBCORE_INITIALIZED_MSG_ID,WebViewClassic收到此消息之后,就会初始化touch事件的分发器:WebViewInputDispatcher。这个流程结束之后,应用就可以通过WebView,loadUrl了。

    从这个图中,我们看到,在WebCore的构造函数中会new WebCoreThread,这个线程就是上面提到的WebCore线程。到此为止,我们已经可以看到有2个线程了,一个是WebView所在的Ui线程,另一个就是WebCore线程。

    那么WebKit的消息处理究竟怎么工作的呢?

    在WebView中,消息在线程间的分发使用的是Handler。在WebKit的消息分发机制中的总共有三个Handler如下:


    mPrivateHandler:

    在WebViewClassic中创建,用来分发和处理UI相关消息,例如重绘,touch事件,另外就是负责跟WebCore线程交互。

    sWebCoreHandler:

    在WebCoreThread中间,主要负责WebViewCore初始化、WebViewCoreWatchDog心跳、WebCore线程优先级别调整。

    mHandler:

    WebCore线程消息循环最主要的handler。任何需要调用WebCore接口的,都需要通过mHandler  send到WebCore线程中去。

    在EventHub的transferMessages()中被new出来的,由于transferMessages()实在WebViweCore的initialize()中被调用的,所以,EventHub的mHandler也是在WebCore线程中。

    Ui线程第一次向WebCore线程发送的消息,并没有直接被分发到WebCore线程中去。而是被缓存在WebViewCore中的mMessages list中,因为有可能在WebKit的消息处理框架还未初始化完毕,Ui线程就已经开始向WebCore线程发送消息了。所以,当WebViewCore最后初始化完毕之后,会调用transferMessages(),在transferMessages中将mMessages中的消息通过mHandler全部send到WebCore线程中去。


    到这里已经应该很明白了:

    Ui线程的消息通过WebViewClassic的mPrivateHandler处理。

    WebCore线程的消息通过EventHub的mHandler和WebViewCore的sWebCoreHandler处理。各个Handler之间可以相互send消息到对方的消息队列中去。


    版权申明:
    转载文章请注明原文出处,任何用于商业目的,请联系本人:hyman_tan@126.com


    展开全文
  • Windows消息处理

    千次阅读 2015-07-09 15:03:29
    这里简述一下关于window消息处理,主要在实际应用中; 在一个Windows程序中,一个程序的的消息是通过一个大循环来实现接收处理的; 这个可以查看win32程序的编写,具体可以详细查找相关资料; 在Windows编程中,...
  • Android Handler消息处理:4个主要参与对象(Handler 消息发送接收处理类 + Message消息对象 + MessageQuene消息队列 + Looper:每个线程只拥有一个Looper,以先进先出的方式负责从MessageQuene消息队列里面读取...
  • rabbitmq消息队列设置过期时间和过期消息处理 适用场景 电商秒杀抢购活动中处理用户下单和付款时间不一致,设置过期时间,过期则不允许付款 参考 ...
  • Duilib消息处理流程图解

    千次阅读 2017-01-10 10:00:33
    要想熟练运用Duilib,熟悉他的消息处理机制是必须的。 网上找了一篇Duilib消息处理剖析,http://blog.csdn.net/rankun1/article/details/54099395 总结的很详细,这里进行梳理了一下,整理了一份不是很规范的...
  • 理解Qt消息机制刻不容缓,那我们从对比传统的windows消息处理机制对比来说起; 只有知道QT底层的消息处理、对我们理解并学习Qt有很大帮助; 下面我将对windows程序与Qt对比,并在核心代码处并给出注释进行对比、方便...
  • 本系列文章由zhmxy555编写,转载请注明出处。 ...键盘加鼠标作为目前人机交互方式依旧的主流,在讲完键盘消息处理之后接着讲鼠标消息处理,自然是理所当然的。 这一
  • MFC中主窗口和创建的模态窗口中同时有一段消息处理函数,处理同一个消息,如何才能实现让创建的窗口处理而不是主窗口去处理(现在就是主窗口截获消息并处理的)。。。。
  • Python进阶_wxpy学习:消息处理

    千次阅读 2018-07-10 09:19:12
    消息处理 消息对象 内容数据 用户相关 群聊相关 回复方法 转发消息 自动处理消息 开始运行 示例代码 已发送消息 历史消息 前言 学习完了python的基本概念和相应操作后,就要进入实战阶段了,首先选的和微信...
  • 今天做老师留下来的作业题,莫名其妙出现Bug,然后发现是MFC消息处理机制的问题,在响应鼠标双击前,会先产生一个单击鼠标消息,这里是我的解决方法。 单击响应: void CMouseView::OnLButtonDown(UINT nFlags, ...
  • View 的 post 消息处理,原理解析 在开发中,我们经常会使用某个视图组件(View 的实例)的 post 方法或 postDelayed 方法,用于将操作发送给主线程执行,或者延迟执行某项任务。这种方式非常的方便,那么这么高频的...
  • 百万用户消息处理

    万次阅读 2019-11-22 11:12:36
    每条消息内容假设占内存1KB(约512个汉字),100W用户的消息同时放入内存约占976.56MB。-----大消息 每条消息内容假设占内存0.12KB(约58个汉字),100W用户的消息同时放入内存约占107.56MB。-----本次需求消息...
  • WINDOWS消息处理过程

    千次阅读 2015-04-07 10:42:35
    WINDOWS消息处理过程   一、引言 二、Windows消息机制的概念 1、DOS与Windows驱动机制的区别 2、消息 3、消息的来源 4、Windows的消息系统的组成 5、消息的响应 三、Windows消息机制要点 1. 窗口过程 2 ...
  • Delphi中,自定义消息处理过程与Windows消息处理过程的定义是一样的。 1.主用步骤: 1>. 首先定义一个消息标识符常量;如:WM_MYMessage = WM_USER+ 5;(标识符常量的取值范围为WM_USER~WM_APP-1) 2>. 在单元...
  • Windows消息处理机制

    万次阅读 2020-06-19 01:13:35
    消息与事件 Windows中的事件是一个“动作”,这个动作可能是用户操作应用程序产生的,也可能是Windows自己产生的。 Windows为了能够准确的描述这些信息,提供了一个结构体:MSG,该结构体里面记录的事件的详细...
  • 消息处理引擎讲解

    千次阅读 2013-11-05 11:59:08
    类图结构如上图,数据流程,外部数据流入消息处理管理对象,流入消息队列,分发器从消息队列中取数据,从处理器工厂中取处理器来处理数据,消息处理管理类提供注册处理器接口,具体代码实现见:...
  • Android应用程序消息处理机制(Looper、Handler)分析

    万次阅读 多人点赞 2011-09-29 00:58:04
    应用程序的主线程不断地从这个消息队例中获取消息(Looper),然后对这些消息进行处理(Handler),这样就实现了通过消息来驱动应用程序的执行,本文将详细分析Android应用程序的消息处理机制。 前面我们学习...
  • Android WebKit消息处理(二)Touch事件的分发处理。详细分析了Android WebKit对于Touch输入事件的处理
  • duilib鼠标键盘消息处理

    万次阅读 2015-01-15 16:14:13
    (想知道duilib整体的消息处理过程,请参考本博客其他文章) 首先用自己的语言描述几个名词: 准事件控件m_pEventClick:在WM_LBUTTONDOWN,WM_RBUTTONDOWN,WM_LBUTTONDBLCLK消息中设置,在WM_LBUTTONUP,WM_...
  • 【消息队列】MSMQ(二)——消息处理流程

    千次阅读 热门讨论 2017-03-18 20:13:24
    所以在这篇博客中小编就和大家一起进行一些对消息的处理,包括了创建消息、发送消息、接收消息、异步消息处理。其中整体上还是比较相似的,在其中的一些过程中我们可以添加事务来保证操作的完整性。二、消息的处理...
  • 如题.最近的一个面试题,说是考虑kafka理论特性.具体要求我可能有理解错误.如果各位有研究一眼看出是什么问题,谢谢...然而此时全部消费者1s只能消费5000,消息处理是纯CPU计算,问:在不添加分区的情况下如何消息处理速度?
  • 一般情况下, 添加常用的消息的消息处理函数的过程为: 点击要添加消息处理函数的类名->在属性界面内选择对应的消息->添加消息处理函数即可.但是, 属性列表里自带的消息处理函数有限. 那么如何添加别的消息的消息处理...
  • MFC消息处理流程概述

    万次阅读 2012-10-07 13:59:54
    本文试图粗略展示出MFC下消息处理的基本流程。 一、先看一下Win32下的消息处理流程 每一个线程都对应有一个消息队列,利用API函数GetMessage从消息队列中获取消息,然后利用TranslateMessage翻译消息(主要是一些...
  • 下面几节将分析MFC的消息机制的实现原理和消息处理的过程。为此,首先要分析ClassWizard实现消息映射的内幕,然后讨论MFC的窗口过程,分析MFC窗口过程是如何实现消息处理的。 消息映射的定义和实现 MFC处理的...
  • 异步消息处理线程启动后会进入一个无限的循环体之中,每循环一次,从其内部的消息队列中取出一个消息,然后回调相应的消息处理函数,执行完成一个消息后则继续循环。若消息队列为空,线程则会阻塞等待。 说了这一堆...
  • Android消息处理机制深度解析笔记

    千次阅读 2016-05-18 17:36:11
    Android消息处理机制深度解析笔记前言很多程序猿(媛)都对消息处理机制做过分析,大家都基本了解了MessageQueue、Handler、Looper之间相互之间怎么协同工作,但是具体到消息是如何传递,取出,如何处理的过程并不是...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,344,010
精华内容 537,604
关键字:

消息处理