精华内容
下载资源
问答
  • Android Handler 异步消息机制

    千次阅读 2012-02-29 22:40:37
    Android Handler 异步消息机制  Handler基本概念:  Handler主要用于异步消息的处理:当发出一个消息之后,首先进入一个消息队列,发送消息的函数即刻返回,而另外一个部分逐个的在消息队列中将消息取出,然后...
    Android Handler 异步消息机制 

    Handler基本概念:
          Handler主要用于异步消息的处理:当发出一个消息之后,首先进入一个消息队列,发送消息的函数即刻返回,而另外一个部分逐个的在消息队列中将消息取出,然后对消息进行出来,就是发送消息和接收消息不是同步的处理。 这种机制通常用来处理相对耗时比较长的操作。

    Handler 常用方法:
    post(Runnable)
            postAtTime(Runnable,long)
            postDelayed(Runnable long)
            sendEmptyMessage(int)
            sendMessage(Message)
            sendMessageAtTime(Message,long)
            sendMessageDelayed(Message,long)
    以上post类方法允许你排列一个Runnable对象到主线程队列中,
            sendMessage类方法, 允许你安排一个带数据的Message对象到队列中,等待更新.


         个人认为Android 中Handler 很像 Web开发中的Ajax。拿jquery 的$.ajax()方法来举例:
    $.ajax({
         url:"xxxx.jsp",
         dataType:"text",
         success:function(message){ // 处理返回的结果}
          })

    首先我们按老规矩先看Demo的效果图:






    1、点击 测试 按钮的开启一条线程处理其他比较耗时的业务,相当于 一个Web页面点击页面的某个按钮后调用javascript 的ajax方法去后台去数据,而本身的页面是没变的。

    2、是否是异步的呢?从图二可以看出。先打印出了线程中 “start Thread”,才执行 onClick 方法中的 “OnClick........” 

    3、下面代码效果等于ajax请求的后台的响应,那Java来说就是 printWriter.prinlt(1);

    1. Thread thread = new Thread()
    2.     {
    3.             public void run()
    4.             {
    5.                     Log.i(TAG, "start Thread");
    6.                     
    7.                     //发送一个空消息到消息队列里面
    8.                     //此方法相当于后台往前台Ajax响应结果,在Java当中,相当于一个Action方法里面out.println(1);
    9.                     handler.sendEmptyMessage(1);
    10.             };
    11.     };
    复制代码

    4、 以下代码相当于ajax的 success:function(message){if(message==1){//do something}}
    1. if(msg.what ==1)
    2.                     {
    3.                             txtTest.setText("异步处理结果 === Handler ");
    4.                     }
    复制代码


    下面看一下具体代码是怎么实现的:

    main.xml文件:
    1. <?xml version="1.0" encoding="utf-8"?>
    2. <LinearLayout xmlns:android="http://schemas.android.com/apk/res/android"
    3.     android:orientation="vertical"
    4.     android:layout_width="fill_parent"
    5.     android:layout_height="fill_parent"
    6.     >
    7. <TextView  
    8.         android:id="@+id/txtTest"
    9.     android:layout_width="fill_parent" 
    10.     android:layout_height="wrap_content" 
    11.     android:text="HandlerTest"
    12.     />
    13.     
    14.    
    15.    <Button android:id="@+id/btnTest"  
    16.            android:layout_width="fill_parent" 
    17.     android:layout_height="wrap_content" 
    18.     android:text="测试"
    19.     />
    20. </LinearLayout>
    复制代码

    Activity:
    1. package com.droidstouc.handler.test;

    2. import android.app.Activity;
    3. import android.os.Bundle;
    4. import android.os.Handler;
    5. import android.util.Log;
    6. import android.view.View;
    7. import android.view.View.OnClickListener;
    8. import android.widget.Button;
    9. import android.widget.TextView;

    10. /**
    11. * Android Handler 异步消息处理机制
    12. * @author Administrator
    13. *
    14. */
    15. public class HandlerTestActivity extends Activity {
    16.    
    17.         
    18.         
    19.         private static final String TAG="HandlerTestActivity";
    20.         
    21.         private Button btnTest;
    22.         private TextView  txtTest;
    23.         
    24.         
    25.     public void onCreate(Bundle savedInstanceState) {
    26.         super.onCreate(savedInstanceState);
    27.         setContentView(R.layout.main);
    28.         
    29.         
    30.         txtTest = (TextView) this.findViewById(R.id.txtTest);
    31.         btnTest= (Button) this.findViewById(R.id.btnTest);
    32.         
    33.         //设置 OnClick **        
    34.         btnTest.setOnClickListener(new BtnTestOnClickListener());
    35.     }
    36.     
    37.     
    38.     
    39.     //点击测试按钮后调用 BtnTestOnClickListener 的 OnClick 方法
    40.     class BtnTestOnClickListener implements OnClickListener
    41.     {
    42.                 @Override
    43.                 public void onClick(View v)
    44.                 {
    45.                 /*        //把线程对象放到handler的队列中,线程会马上启动执行
    46.                         handler.post(thread);*/
    47.                         
    48.                         //启动线程
    49.                         thread.start();
    50.                         try
    51.                         {
    52.                                 // 为了看到异步效果,我让当前线程停止了2秒钟
    53.                                 Thread.sleep(2000);
    54.                         }
    55.                         catch (InterruptedException e)
    56.                         {
    57.                                 e.printStackTrace();
    58.                         }
    59.                         
    60.                         Log.i(TAG, "OnClick........");
    61.                 }
    62.             
    63.     }
    64.     
    65.     
    66.    // 定义一个Handler,用来异步处理数据
    67.     Handler handler = new Handler()
    68.     {
    69.             //相当于jquery $.ajax方法中的 Success:function(){}
    70.             
    71.             public void handleMessage(android.os.Message msg) 
    72.             {
    73.                     // 对线程中 handler 返回的结果进行处理
    74.                     
    75.                     Log.i(TAG, "结果返回,正在处理");
    76.                     if(msg.what ==1)
    77.                     {
    78.                             txtTest.setText("异步处理结果 === Handler ");
    79.                     }
    80.                     
    81.             };
    82.     };
    83.     
    84.     
    85.     
    86.     Thread thread = new Thread()
    87.     {
    88.             public void run()
    89.             {
    90.                     Log.i(TAG, "start Thread");
    91.                     
    92.                     //发送一个空消息到消息队列里面
    93.                     //此方法相当于后台往前台Ajax响应结果,在Java当中,相当于一个Action方法里面out.println(1);
    94.                     handler.sendEmptyMessage(1);
    95.             };
    96.     };
    97. }
    复制代码

    展开全文
  • 上篇说了半天,却回避了一个重要的问题:为什么要用异步呢,它有什么样的好处?坦率的说,我对这点的认识不是太深刻(套句俗语,只可意会,不可言传)。还是举个例子吧: 比如Client向Server发送一个request,...

    上篇说了半天,却回避了一个重要的问题:为什么要用异步呢,它有什么样的好处?坦率的说,我对这点的认识不是太深刻(套句俗语,只可意会,不可言传)。还是举个例子吧:
    比如Client向Server发送一个request,Server收到后需要100ms的处理时间,为了方便起见,我们忽略掉网络的延迟,并且,我们认为Server端的处理能力是无穷大的。在这个use case下,如果采用同步机制,即Client发送request -> 等待结果 -> 继续发送,那么,一个线程一秒钟之内只能够发送10个request,如果希望达到10000 request/s的发送压力,那么Client端就需要创建1000个线程,而这么多线程的context switch就成为client的负担了。而采用异步机制,就不存在这个问题了。Client将request发送出去后,立即发送下一个request,理论上,它能够达到网卡发送数据的极限。当然,同时需要有机制不断的接收来自Server端的response。

    以上的例子其实就是这篇的主题,异步的消息机制,基本的流程是这样的:

    如果仔细琢磨的话,会发现这个流程中有两个很重要的问题需要解决:
    1. 当client接收到response后,怎样确认它到底是之前哪个request的response呢?
    2. 如果发送一个request后,这个request对应的response由于种种原因(比如server端出问题了)一直没有返回。client怎么能够发现类似这样长时间没有收到response的request呢?

    对于第一个问题,一般会尝试给每个request分配一个独一无二的ID,返回的Response会同时携带这个ID,这样就能够将request和response对应上了。
    对于第二个问题,需要有一个timeout机制,对于每一个request都有一个定时器,如果到指定时间仍然没有返回结果,那么会触发timeout操作。多说一句,timeout机制其实对于涉及网络的同步机制也是非常有必要的,因为有可能client与server之间的链接坏了,在极端情况下,client会被一直阻塞住。

    纸上谈兵了这么久,还是看一个实际的例子。我在这里用Hadoop的RPC代码举例。这里需要事先说明的是,Hadoop的RPC对外的接口其实是同步的,但是,RPC的内部实现其实是异步消息机制。多说无益,直接看代码吧(讨论的所有代码都在org.apache.hadoop.ipc.Client.java里):

    1. public Writable call(Writable param, ConnectionId remoteId)    
    2.     throws InterruptedException, IOException {  
    3.     //具体的代码一会再看...  
    4. }  

    这就是Client.java对外提供的接口。一共有两个参数,param是希望发送的request,remoteId是指远程server对应的Id。函数的返回就是response(也是继承自writable)。所以说,这是一个同步调用,一旦call函数返回,那么response也就拿到了。

    call函数的具体实现一会再看,先介绍Client中两个重要的内部类:

    1.  private class Call {  
    2.    int id;                                       // call id  
    3.    Writable param;                               // parameter  
    4.    Writable value;                               // value, null if error  
    5.    IOException error;                            // exception, null if value  
    6.    boolean done;                                 // true when call is done  
    7.    protected Call(Writable param) {  
    8.      this.param = param;  
    9.      synchronized (Client.this) {  
    10.        this.id = counter++;  
    11.      }  
    12.    }  
    13.    protected synchronized void callComplete() {  
    14.      this.done = true;  
    15.      notify();                                 // notify caller  
    16.    }  
    17. //.........  
    18.      
    19.    public synchronized void setValue(Writable value) {  
    20.      this.value = value;  
    21.      callComplete();  
    22.    }  
    23.      
    24.    public synchronized Writable getValue() {  
    25.      return value;  
    26.    }  
    27.  }  

    call这个类对应的就是一次异步请求。它的几个成员变量:
    id: 这个就是之前提过的,对于每一个request都需要分配一个唯一标示符,这样接收到response后才能知道到底对应哪个request;
    param: 需要发送到server的request;
    value: 从server发送过来的response;
    error: 可能发生的异常(比如网络读写错误,server挂了,等等);
    done:  表示这个call是否成功完成了,即是否接收到了response;

     

    1. private class Connection extends Thread {  
    2.     private InetSocketAddress server;             // server ip:port  
    3.       
    4.     // .........  
    5.       
    6.     private Socket socket = null;                 // connected socket  
    7.     private DataInputStream in;  
    8.     private DataOutputStream out;  
    9.       
    10. //............      
    11.     // currently active calls  
    12.     private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();  
    13.     
    14. // .......  
    15.    private synchronized boolean addCall(Call call) {  
    16.       if (shouldCloseConnection.get())  
    17.         return false;  
    18.       calls.put(call.id, call);  
    19.       notify();  
    20.       return true;  
    21. }  
    22.     private void receiveResponse() {  
    23.       if (shouldCloseConnection.get()) {  
    24.         return;  
    25.       }  
    26.       touch();  
    27.         
    28.       try {  
    29.         int id = in.readInt();                    // try to read an id  
    30.         if (LOG.isDebugEnabled())  
    31.           LOG.debug(getName() + " got value #" + id);  
    32.         Call call = calls.get(id);  
    33.         int state = in.readInt();     // read call status  
    34.         if (state == Status.SUCCESS.state) {  
    35.           Writable value = ReflectionUtils.newInstance(valueClass, conf);  
    36.           value.readFields(in);                 // read value  
    37.           call.setValue(value);  
    38.           calls.remove(id);  
    39.         } else if (state == Status.ERROR.state) {  
    40.           call.setException(new RemoteException(WritableUtils.readString(in),  
    41.                                                 WritableUtils.readString(in)));  
    42.           calls.remove(id);  
    43.         } else if (state == Status.FATAL.state) {  
    44.           // Close the connection  
    45.           markClosed(new RemoteException(WritableUtils.readString(in),   
    46.                                          WritableUtils.readString(in)));  
    47.         }  
    48.       } catch (IOException e) {  
    49.         markClosed(e);  
    50.       }  
    51.     }  
    52.       
    53.     public void run() {  
    54.       if (LOG.isDebugEnabled())  
    55.         LOG.debug(getName() + ": starting, having connections "   
    56.             + connections.size());  
    57.       try {  
    58.         while (waitForWork()) {//wait here for work - read or close connection  
    59.           receiveResponse();  
    60.         }  
    61.       } catch (Throwable t) {  
    62.         LOG.warn("Unexpected error reading responses on connection " + this, t);  
    63.         markClosed(new IOException("Error reading responses", t));  
    64.       }  
    65.         
    66.       close();  
    67.         
    68.       if (LOG.isDebugEnabled())  
    69.         LOG.debug(getName() + ": stopped, remaining connections "  
    70.             + connections.size());  
    71.     }  
    72.     public void sendParam(Call call) {  
    73.       if (shouldCloseConnection.get()) {  
    74.         return;  
    75.       }  
    76.       DataOutputBuffer d=null;  
    77.       try {  
    78.         synchronized (this.out) {  
    79.           if (LOG.isDebugEnabled())  
    80.             LOG.debug(getName() + " sending #" + call.id);  
    81.             
    82.           //for serializing the  
    83.           //data to be written  
    84.           d = new DataOutputBuffer();  
    85.           d.writeInt(call.id);  
    86.           call.param.write(d);  
    87.           byte[] data = d.getData();  
    88.           int dataLength = d.getLength();  
    89.           out.writeInt(dataLength);      //first put the data length  
    90.           out.write(data, 0, dataLength);//write the data  
    91.           out.flush();  
    92.         }  
    93.       } catch(IOException e) {  
    94.         markClosed(e);  
    95.       } finally {  
    96.         //the buffer is just an in-memory buffer, but it is still polite to  
    97.         // close early  
    98.         IOUtils.closeStream(d);  
    99.       }  
    100.     }    
    101. }  

    Connection这个类要比之前的Call复杂得多,所以我省略了很多这里不会被讨论的代码。
    Connection对应于一个连接,即一个socket。但同时,它又继承自Thread,所有它本身又对应于一个线程。可以看出,在Hadoop的RPC中,一个连接对应于一个线程。先看他的成员变量:
    server: 这是远程server的地址;
    socket: 对应的socket;
    in / out: socket的输入流和输出流;
    calls: 重要的成员变量。它是一个hash表, 维护了这个connection正在进行的所有call和它们对应的id之间的关系。当读取到一个response后,就通过id在这张表中找到对应的call;
    再看看它的run()函数。这是Connection这个线程的启动函数,我贴的代码中这个函数没做任何的删减,你可以发现,刨除一些冗余代码,这个函数其实就只做了一件事:receiveResponse,即等待接收response。

     

    OK。回到call()这个函数,看看它到底做了什么:

    1. public Writable call(Writable param, ConnectionId remoteId)    
    2.       throws InterruptedException, IOException {  
    3.     Call call = new Call(param);  
    4.     Connection connection = getConnection(remoteId, call);  
    5.     connection.sendParam(call);                 // send the parameter  
    6.     boolean interrupted = false;  
    7.     synchronized (call) {  
    8.       while (!call.done) {  
    9.         try {  
    10.           call.wait();                           // wait for the result  
    11.         } catch (InterruptedException ie) {  
    12.           // save the fact that we were interrupted  
    13.           interrupted = true;  
    14.         }  
    15.       }  
    16.       if (interrupted) {  
    17.         // set the interrupt flag now that we are done waiting  
    18.         Thread.currentThread().interrupt();  
    19.       }  
    20.       if (call.error != null) {  
    21.         if (call.error instanceof RemoteException) {  
    22.           call.error.fillInStackTrace();  
    23.           throw call.error;  
    24.         } else { // local exception  
    25.           throw wrapException(remoteId.getAddress(), call.error);  
    26.         }  
    27.       } else {  
    28.         return call.value;  
    29.       }  
    30.     }  
    31.   }  
    首先,它创建了一个新的call(这个call是Call类的实体,注意和call()函数的区分),然后根据remoteId找到对应的connection(Client类中维护了一个connection pool),然后调用connection.sendParam()。从前面找到这个函数,你会发现它就是将request写入到socket,发送出去。
    但值得一提的是,它使用的write是最普通的blocking IO,也是同步IO(后面会看到,它读取response也是用的blcoking IO,所以,hadoop RPC虽然是异步机制,但是采用的是同步blocking IO,所以,异步消息机制还采用什么样的IO机制是没有关系的)。
    接下来,调用了call.wait(),将线程阻塞在这里。直到在某个地方调用了call.notify(),它才重新运行起来,然后一通判断后返回call.value,即接收到的response。

    所以,剩下的问题是,到底是哪调用了call.notify()?
    回到connection的receiveResponse函数:
    首先,它从socket的输入流中读到一个id,然后根据这个id找到对应的call,调用call.setValue将从socket中读取的response放入到call的value中,然后调用calls.remove(id)将这个call从队列中移除。这里要注意的是call.setValue,这个函数将value设置好之后,调用了call.notify()!

    好了,让我们再重头将流程捋一遍:
    这里其实有两个线程,一个线程是调用Client.call(),希望向远程server发送请求的线程,另外一个线程就是connection对应的那个线程。当然,虽然有两个线程,但server对应的只有一个socket。第一个线程创建call,然后调用call.sendParam将request通过这个socket发送出去;而第二个线程不断的从socket中读取response。因此,request的发送和response的接收被分隔到不同的线程中执行,而且这两个线程之间关于socket的读写并没有任何的同步机制,因此我认为这个RPC是异步消息机制实现的,只不过通过call.wait()/call.notify()使得对外的接口看上去像是同步。

    好了,Hadoop的RPC介绍完了(虽然我略掉了很多内容,比如timeout机制我这里就没写),说说我个人的评价吧。我认为,Hadoop的这个设计还是挺巧妙的,底层采用的是异步机制,但对外的接口提供的又是一般人比较习惯的同步方式。但是,我觉着缺点不是没有,一个问题是一个链接就要产生一个线程,这个如果是在几千台的cluster中,仍然会带来巨大的线程context switch的开销;另一个问题是对于同一个remote server只有一个socket来进行数据的发送和接收,这样的设计网络的吞吐量很有可能上不去。
    展开全文
  • 上篇说了半天,却回避了一个重要的问题:为什么要用异步呢,它有什么样的好处?坦率的说,我对这点的认识不是太深刻(套句俗语,只可意会,不可言传)。还是举个例子吧: 比如Client向Server发送一个...

    上篇说了半天,却回避了一个重要的问题:为什么要用异步呢,它有什么样的好处?坦率的说,我对这点的认识不是太深刻(套句俗语,只可意会,不可言传)。还是举个例子吧:
    比如Client向Server发送一个request,Server收到后需要100ms的处理时间,为了方便起见,我们忽略掉网络的延迟,并且,我们认为Server端的处理能力是无穷大的。在这个use case下,如果采用同步机制,即Client发送request -> 等待结果 -> 继续发送,那么,一个线程一秒钟之内只能够发送10个request,如果希望达到10000 request/s的发送压力,那么Client端就需要创建1000个线程,而这么多线程的context switch就成为client的负担了。而采用异步机制,就不存在这个问题了。Client将request发送出去后,立即发送下一个request,理论上,它能够达到网卡发送数据的极限。当然,同时需要有机制不断的接收来自Server端的response。

    以上的例子其实就是这篇的主题,异步的消息机制,基本的流程是这样的:

    如果仔细琢磨的话,会发现这个流程中有两个很重要的问题需要解决:
    1. 当client接收到response后,怎样确认它到底是之前哪个request的response呢?
    2. 如果发送一个request后,这个request对应的response由于种种原因(比如server端出问题了)一直没有返回。client怎么能够发现类似这样长时间没有收到response的request呢?

    对于第一个问题,一般会尝试给每个request分配一个独一无二的ID,返回的Response会同时携带这个ID,这样就能够将request和response对应上了。
    对于第二个问题,需要有一个timeout机制,对于每一个request都有一个定时器,如果到指定时间仍然没有返回结果,那么会触发timeout操作。多说一句,timeout机制其实对于涉及网络的同步机制也是非常有必要的,因为有可能client与server之间的链接坏了,在极端情况下,client会被一直阻塞住。

    纸上谈兵了这么久,还是看一个实际的例子。我在这里用Hadoop的RPC代码举例。这里需要事先说明的是,Hadoop的RPC对外的接口其实是同步的,但是,RPC的内部实现其实是异步消息机制。多说无益,直接看代码吧(讨论的所有代码都在org.apache.hadoop.ipc.Client.java 里):

    这就是Client.java对外提供的接口。一共有两个参数,param是希望发送的request,remoteId是指远程server对应的Id。函数的返回就是response(也是继承自writable)。所以说,这是一个同步调用,一旦call函数返回,那么response也就拿到了。

    call函数的具体实现一会再看,先介绍Client中两个重要的内部类:

    call这个类对应的就是一次异步请求。它的几个成员变量:
    id: 这个就是之前提过的,对于每一个request都需要分配一个唯一标示符,这样接收到response后才能知道到底对应哪个request;
    param: 需要发送到server的request;
    value: 从server发送过来的response;
    error: 可能发生的异常(比如网络读写错误,server挂了,等等);
    done:  表示这个call是否成功完成了,即是否接收到了response;

     

    Connection这个类要比之前的Call复杂得多,所以我省略了很多这里不会被讨论的代码。
    Connection对应于一个连接,即一个socket。但同时,它又继承自Thread,所有它本身又对应于一个线程。可以看出,在Hadoop的RPC中,一个连接对应于一个线程。先看他的成员变量:
    server: 这是远程server的地址;
    socket: 对应的socket;
    in / out: socket的输入流和输出流;
    calls: 重要的成员变量。它是一个hash表, 维护了这个connection正在进行的所有call和它们对应的id之间的关系。当读取到一个response后,就通过id在这张表中找到对应的call;
    再看看它的run()函数。这是Connection这个线程的启动函数,我贴的代码中这个函数没做任何的删减,你可以发现,刨除一些冗余代码,这个函数其实就只做了一件事:receiveResponse,即等待接收response。

     

    OK。回到call()这个函数,看看它到底做了什么:

    首先,它创建了一个新的call(这个call是Call类的实体,注意和call()函数的区分),然后根据remoteId找到对应的connection(Client类中维护了一个connection pool),然后调用connection.sendParam()。从前面找到这个函数,你会发现它就是将request写入到socket,发送出去。
    但值得一提的是,它使用的write是最普通的blocking IO,也是同步IO(后面会看到,它读取response也是用的blcoking IO,所以,hadoop RPC虽然是异步机制,但是采用的是同步blocking IO,所以,异步消息机制还采用什么样的IO机制是没有关系的)。
    接下来,调用了call.wait(),将线程阻塞在这里。直到在某个地方调用了call.notify(),它才重新运行起来,然后一通判断后返回call.value,即接收到的response。

    所以,剩下的问题是,到底是哪调用了call.notify()?
    回到connection的receiveResponse函数:
    首先,它从socket的输入流中读到一个id,然后根据这个id找到对应的call,调用call.setValue将从socket中读取的response放入到call的value中,然后调用calls.remove(id)将这个call从队列中移除。这里要注意的是call.setValue,这个函数将value设置好之后,调用了call.notify()!

    好了,让我们再重头将流程捋一遍:
    这里其实有两个线程,一个线程是调用Client.call(),希望向远程server发送请求的线程,另外一个线程就是connection对应的那个线程。当然,虽然有两个线程,但server对应的只有一个socket。第一个线程创建call,然后调用call.sendParam将request通过这个socket发送出去;而第二个线程不断的从socket中读取response。因此,request的发送和response的接收被分隔到不同的线程中执行,而且这两个线程之间关于socket的读写并没有任何的同步机制,因此我认为这个RPC是异步消息机制实现的,只不过通过call.wait()/call.notify()使得对外的接口看上去像是同步。

    好了,Hadoop的RPC介绍完了(虽然我略掉了很多内容,比如timeout机制我这里就没写),说说我个人的评价吧。我认为,Hadoop的这个设计还是挺巧妙的,底层采用的是异步机制,但对外的接口提供的又是一般人比较习惯的同步方式。但是,我觉着缺点不是没有,一个问题是一个链接就要产生一个线程,这个如果是在几千台的cluster中,仍然会带来巨大的线程context switch的开销;另一个问题是对于同一个remote server只有一个socket来进行数据的发送和接收,这样的设计网络的吞吐量很有可能上不去。(一家之言,欢迎指正)

    未完待续~

    展开全文
  • 异步消息的传递-回调机制 软件模块之间总是存在着一定的接口,从调用方式上,可以把他们分为三类:同步调用、回调和异步调用。同步调用是一种阻塞式调用,调用方要等待对方执行完毕才返回,它是一种单向...

    异步消息的传递-回调机制

    软件模块之间总是存在着一定的接口,从调用方式上,可以把他们分为三类:同步调用、回调和异步调用。同步调用是一种阻塞式调用,调用方要等待对方执行完毕才返回,它是一种单向调用;回调是一种双向调用模式,也就是说,被调用方在接口被调用时也会调用对方的接口;异步调用是一种类似消息或事件的机制,不过它的调用方向刚好相反,接口的服务在收到某种讯息或发生某种事件时,会主动通知客户方(即调用客户方的接口)。回调和异步调用的关系非常紧密,通常我们使用回调来实现异步消息的注册,通过异步调用来实现消息的通知。同步调用是三者当中最简单的,而回调又常常是异步调用的基础,因此,下面我们着重讨论回调机制在不同软件架构中的实现。

    陈家朋 (japen@vip.sina.com), 系统架构师和技术顾问, 杭州迈可行通信技术有限公司

    2003 年 3 月 01 日

    • expand内容

    1 什么是回调

    软件模块之间总是存在着一定的接口,从调用方式上,可以把他们分为三类:同步调用、回调和异步调用。同步调用是一种阻塞式调用,调用方要等待对方执行完毕才返回,它是一种单向调用;回调是一种双向调用模式,也就是说,被调用方在接口被调用时也会调用对方的接口;异步调用是一种类似消息或事件的机制,不过它的调用方向刚好相反,接口的服务在收到某种讯息或发生某种事件时,会主动通知客户方(即调用客户方的接口)。回调和异步调用的关系非常紧密,通常我们使用回调来实现异步消息的注册,通过异步调用来实现消息的通知。同步调用是三者当中最简单的,而回调又常常是异步调用的基础,因此,下面我们着重讨论回调机制在不同软件架构中的实现。

    对于不同类型的语言(如结构化语言和对象语言)、平台(Win32、JDK)或构架(CORBA、DCOM、WebService),客户和服务的交互除了同步方式以外,都需要具备一定的异步通知机制,让服务方(或接口提供方)在某些情况下能够主动通知客户,而回调是实现异步的一个最简捷的途径。

    对于一般的结构化语言,可以通过回调函数来实现回调。回调函数也是一个函数或过程,不过它是一个由调用方自己实现,供被调用方使用的特殊函数。

    在面向对象的语言中,回调则是通过接口或抽象类来实现的,我们把实现这种接口的类成为回调类,回调类的对象成为回调对象。对于象C++或Object Pascal这些兼容了过程特性的对象语言,不仅提供了回调对象、回调方法等特性,也能兼容过程语言的回调函数机制。

    Windows平台的消息机制也可以看作是回调的一种应用,我们通过系统提供的接口注册消息处理函数(即回调函数),从而实现接收、处理消息的目的。由于Windows平台的API是用C语言来构建的,我们可以认为它也是回调函数的一个特例。

    对于分布式组件代理体系CORBA,异步处理有多种方式,如回调、事件服务、通知服务等。事件服务和通知服务是CORBA用来处理异步消息的标准服务,他们主要负责消息的处理、派发、维护等工作。对一些简单的异步处理过程,我们可以通过回调机制来实现。

    下面我们集中比较具有代表性的语言(C、Object Pascal)和架构(CORBA)来分析回调的实现方式、具体作用等。

    2 过程语言中的回调(C)

    2.1 函数指针

    回调在C语言中是通过函数指针来实现的,通过将回调函数的地址传给被调函数从而实现回调。因此,要实现回调,必须首先定义函数指针,请看下面的例子:

    void Func(char *s);// 函数原型
    void (*pFunc) (char *);//函数指针

    可以看出,函数的定义和函数指针的定义非常类似。

    一般的化,为了简化函数指针类型的变量定义,提高程序的可读性,我们需要把函数指针类型自定义一下。

    typedef void(*pcb)(char *);

    回调函数可以象普通函数一样被程序调用,但是只有它被当作参数传递给被调函数时才能称作回调函数。

    被调函数的例子:

    void GetCallBack(pcb callback)
    {
    /*do something*/
    }
    用户在调用上面的函数时,需要自己实现一个pcb类型的回调函数:
    void fCallback(char *s) 
    {
    /* do something */
    } 
    然后,就可以直接把fCallback当作一个变量传递给GetCallBack,
    GetCallBack(fCallback);

    如果赋了不同的值给该参数,那么调用者将调用不同地址的函数。赋值可以发生在运行时,这样使你能实现动态绑定。

    2.2 参数传递规则

    到目前为止,我们只讨论了函数指针及回调而没有去注意ANSI C/C++的编译器规范。许多编译器有几种调用规范。如在Visual C++中,可以在函数类型前加_cdecl,_stdcall或者_pascal来表示其调用规范(默认为_cdecl)。C++ Builder也支持_fastcall调用规范。调用规范影响编译器产生的给定函数名,参数传递的顺序(从右到左或从左到右),堆栈清理责任(调用者或者被调用者)以及参数传递机制(堆栈,CPU寄存器等)。

    将调用规范看成是函数类型的一部分是很重要的;不能用不兼容的调用规范将地址赋值给函数指针。例如:

    // 被调用函数是以int为参数,以int为返回值
    __stdcall int callee(int); 
    // 调用函数以函数指针为参数
    void caller( __cdecl int(*ptr)(int)); 
    // 在p中企图存储被调用函数地址的非法操作
    __cdecl int(*p)(int) = callee; // 出错

    指针p和callee()的类型不兼容,因为它们有不同的调用规范。因此不能将被调用者的地址赋值给指针p,尽管两者有相同的返回值和参数列

    2.3 应用举例

    C语言的标准库函数中很多地方就采用了回调函数来让用户定制处理过程。如常用的快速排序函数、二分搜索函数等。

    快速排序函数原型:

    void qsort(void *base, size_t nelem, size_t width, 
      int (_USERENTRY *fcmp)(const void *, const void *));
    二分搜索函数原型:
    void *bsearch(const void *key, const void *base, size_t nelem,
    	size_t width, int (_USERENTRY *fcmp)(const void *, const void *));

    其中fcmp就是一个回调函数的变量。

    下面给出一个具体的例子:

    #include <stdio.h>
    #include <stdlib.h>
    int sort_function( const void *a, const void *b);
    int list[5] = { 54, 21, 11, 67, 22 };
    int main(void)
    {
       int  x;
       qsort((void *)list, 5, sizeof(list[0]), sort_function);
       for (x = 0; x < 5; x++)
          printf("%i\n", list[x]);
       return 0;
    }
    int sort_function( const void *a, const void *b)
    {
       return *(int*)a-*(int*)b;
    }

    2.4 面向对象语言中的回调(Delphi)

    Dephi与C++一样,为了保持与过程语言Pascal的兼容性,它在引入面向对象机制的同时,保留了以前的结构化特性。因此,对回调的实现,也有两种截然不同的模式,一种是结构化的函数回调模式,一种是面向对象的接口模式。

    2.4.1 回调函数

    回调函数类型定义:

    type
       TCalcFunc=function (a:integer;b:integer):integer;

    按照回调函数的格式自定义函数的实现,如

    function Add(a:integer;b:integer):integer
    begin
      result:=a+b;
    end;
    function Sub(a:integer;b:integer):integer
    begin
      result:=a-b;
    end;

    回调的使用

    function Calc(calc:TcalcFunc;a:integer;b:integer):integer

    下面,我们就可以在我们的程序里按照需要调用这两个函数了

    c:=calc(add,a,b);//c=a+b
    c:=calc(sub,a,b);//c=a-b

    2.4.2 回调对象

    什么叫回调对象呢,它具体用在哪些场合?首先,让我们把它与回调函数对比一下,回调函数是一个定义了函数的原型,函数体则交由第三方来实现的一种动态应用模式。要实现一个回调函数,我们必须明确知道几点:该函数需要那些参数,返回什么类型的值。同样,一个回调对象也是一个定义了对象接口,但是没有具体实现的抽象类(即接口)。要实现一个回调对象,我们必须知道:它需要实现哪些方法,每个方法中有哪些参数,该方法需要放回什么值。

    因此,在回调对象这种应用模式中,我们会用到接口。接口可以理解成一个定义好了但是没有实现的类,它只能通过继承的方式被别的类实现。Delphi中的接口和COM接口类似,所有的接口都继承与IInterface(等同于IUnknow),并且要实现三个基本的方法QueryInterface, _AddRef, 和_Release。

    • 定义一个接口 
      type IShape=interface(IInterface)
      	procedure Draw;
      end
    • 实现回调类 
      type TRect=class(TObject,IShape)
      	protected
            function QueryInterface(const IID: TGUID; out Obj): HResult; stdcall;
            function _AddRef: Integer; stdcall;
      function _Release: Integer; stdcall;
          public
      	  procedure Draw;
      end;
      type TRound=class(TObject,IShape)
      	protected
            function QueryInterface(const IID: TGUID; out Obj): HResult; stdcall;
            function _AddRef: Integer; stdcall;
      function _Release: Integer; stdcall;
          public
      	  procedure Draw;
      end;
    • 使用回调对象 
      procedure MyDraw(shape:IShape);
      var 
      shape:IShape;
      begin
      shape.Draw; 
      end;

    如果传入的对象为TRect,那么画矩形;如果为TRound,那么就为圆形。用户也可以按照自己的意图来实现IShape接口,画出自己的图形:

    MyDraw(Trect.Create);
    MyDraw(Tround.Create);

    2.4.3 回调方法

    回调方法(Callback Method)可以看作是回调对象的一部分,Delphi对windows消息的封装就采用了回调方法这个概念。在有些场合,我们不需要按照给定的要求实现整个对象,而只要实现其中的一个方法就可以了,这是我们就会用到回调方法。

    回调方法的定义如下:

    TNotifyEvent = procedure(Sender: TObject) of object; 
    TMyEvent=procedure(Sender:Tobject;EventId:Integer) of object;

    TNotifyEvent 是Delphi中最常用的回调方法,窗体、控件的很多事件,如单击事件、关闭事件等都是采用了TnotifyEvent。回调方法的变量一般通过事件属性的方式来定义,如TCustomForm的创建事件的定义:

    property OnCreate: TNotifyEvent read FOnCreate write FOnCreate stored IsForm;

    我们通过给事件属性变量赋值就可以定制事件处理器。

    用户定义对象(包含回调方法的对象):

    type TCallback=Class
        procedure ClickFunc(sender:TObject);
    end;
    procedure Tcallback.ClickFunc(sender:TObject);
    begin
      showmessage('the caller is clicked!');
    end;

    窗体对象:

    type TCustomFrm=class(TForm)
      public
    	procedure RegisterClickFunc(cb:procedure(sender:Tobject) of object);
    end;
    procedure TcustomFrm..RegisterClickFunc(cb:TNotifyEvent);
    begin
      self.OnClick=cb;
    end;

    使用方法:

    var
      frm:TcustomFrm;
    begin
      frm:=TcustomFrm.Create(Application);
      frm.RegisterClickFunc(Tcallback.Create().ClickFunc);
    end;

    3 回调在分布式计算中的应用(CORBA)

    3.1 回调接口模型

    CORBA的消息传递机制有很多种,比如回调接口、事件服务和通知服务等。回调接口的原理很简单,CORBA客户和服务器都具有双重角色,即充当服务器也是客户客户。

    回调接口的反向调用与正向调用往往是同时进行的,如果服务端多次调用该回调接口,那么这个回调接口就变成异步接口了。因此,回调接口在CORBA中常常充当事件注册的用途,客户端调用该注册函数时,客户函数就是回调函数,在此后的调用中,由于不需要客户端的主动参与,该函数就是实现了一种异步机制。

    从CORBA规范我们知道,一个CORBA接口在服务端和客户端有不同的表现形式,在客户端一般使用桩(Stub)文件,服务端则用到框架(Skeleton)文件,接口的规格采用IDL来定义。而回调函数的引入,使得服务端和客户端都需要实现一定的桩和框架。下面是回调接口的实现模型:

    3.1.1 范例
    1.1 范例

    下面给出了一个使用回调的接口文件,服务端需要实现Server接口的框架,客户端需要实现CallBack的框架:

    module cb
    {
    	interface CallBack;
    	interface Server;
    interface CallBack 
    {
        	void OnEvent(in long Source,in long msg);
    };
      	interface Server 
    {
        	long RegisterCB(in CallBack cb);
    		void UnRegisterCB(in long hCb);
    };
    };

    客户端首先通过同步方式调用服务端的接口RegistCB,用来注册回调接口CallBack。服务端收到该请求以后,就会保留该接口引用,如果发生某种事件需要向客户端通知的时候就通过该引用调用客户方的OnEvent函数,以便对方及时处理。

    本文源码 下载

    展开全文
  • 2、发送消息和处理消息 二、实战案例 1、Handler对象在新启动的子线程发送消息 2、在主线程中创建Handler、并获取处理消息 3、demo 演示效果 三、总结说 1、Handler基本用法 2、Handler能解决的常见问题? ....
  • 举例说同步异步阻塞非阻塞机制

    千次阅读 2014-09-18 01:32:05
     同步/异步: 消息通知机制相关=>需要自己关注还是提供callback  阻塞/非阻塞:等待消息时的状态=>是否可以干别的事情   可以相互组合: 同步阻塞: 劫匪看着衰男把钱装好,不能干其他...
  • 在实际应用中经常会遇到比较耗时任务的处理,比如网络连接,数据库操作等情况时,如果这些操作都是放在主线程(UI线程)中,则会造成UI的假死现象,...关于Android中的消息机制,大家可以学习下这篇文章:Android消息
  • 异步消息的传递-回调机制 作者:陈家朋 (japen@vip.sina.com) 文章来源:IBM转载自:http
  • 1. 什么是回调软件模块之间总是存在着一定的接口,从调用方式上,可以...异步调用是一种类似消息或事件的机制,不过它的调用方向刚好相反,接口的服务在收到某种讯息或发生某种事件时,会主动通知客户方(即调用客户方
  • AsyncTask生来就是处理一些后台的比较耗时的任务,给用户带来良好用户体验的,从编程的语法上显得优雅了许多,不再需要子线程和Handler就可以完成异步操作并且刷新用户界面。Android官网总结的关于Handler类的两个...
  • Linux异步机制

    千次阅读 2015-08-18 00:26:00
    Linux异步机制 还没有细细整理。 什么是异步通知:很简单,一旦设备准备好,就主动通知应用程序,这种情况下应用程序就不需要查询设备状态, 特像硬件上常提的“中断的概念”。 比较准确的说法其实应该...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 38,494
精华内容 15,397
关键字:

异步消息机制举例