精华内容
下载资源
问答
  • Thrift 异步模式

    千次阅读 2018-02-02 10:29:15
    但是在一些情况下(例如大数据量同步),如果可以使用异步模式,可以优化程序结构和提高模块性能。 thrift 有提供一套异步模式模式供我们使用,我们跟往常一样来编写一个thrift 协议文件。 namespace cpp ...

    我们广泛使用thrift作为我们内部接口调用的RPC框架,而且基本上都是使用多线程请求等待应答的同步模式 。但是在一些情况下(例如大数据量同步),如果可以使用异步模式,可以优化程序结构和提高模块性能。

    thrift 有提供一套异步模式模式供我们使用,我们跟往常一样来编写一个thrift 协议文件。

    namespace cpp example
    service Twitter {
       string sendString(1:string data);
    }
    不同的是,我们需要加入cpp:cob_type 来生成代码。

    thrift -r -strict –gen cpp:cob_style -o ./ test.thrift

    生成的代码文件表和之前的基本相同,但在Twitter.cpp 和Twitter.h 文件中增加了异步客户端和异步服务器使用的类。

    $ tree gen-cpp 
    |– Test_constants.cpp 
    |– Test_constants.h 
    |– Test_types.cpp 
    |– Test_types.h 
    |– Twitter.cpp 
    |– Twitter.h 
    |–Twitter_server.skeleton.cpp 
    |-Twitter_async_server.skeleton.cpp

    用户只要关心在Twitter.h 中的TwitterCobClient、TwitterCobSvIf和TwitterAsyncProcessor这三个类。

    Thrift 异步Client
    异步客户端代码有TwitterCobClient 以及它继承的类。

    class TwitterCobClient : virtual public TwitterCobClIf {
     public:
      TwitterCobClient(boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> channel, ::apache::thrift::protocol::TProtocolFactory* protocolFactory) :
        channel_(channel),
        itrans_(new ::apache::thrift::transport::TMemoryBuffer()),
        otrans_(new ::apache::thrift::transport::TMemoryBuffer()),
        piprot_(protocolFactory->getProtocol(itrans_)),
        poprot_(protocolFactory->getProtocol(otrans_)) {
        iprot_ = piprot_.get();
        oprot_ = poprot_.get();
      }
      boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> getChannel() {
        return channel_;
      }
      virtual void completed__(bool /* success */) {}
      void sendString(tcxx::function<void(TwitterCobClient* client)> cob, const std::string& data);
      void send_sendString(const std::string& data);
      void recv_sendString(std::string& _return);
     protected:
      boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> channel_;
      boost::shared_ptr< ::apache::thrift::transport::TMemoryBuffer> itrans_;
      boost::shared_ptr< ::apache::thrift::transport::TMemoryBuffer> otrans_;
      boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot_;
      boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot_;
      ::apache::thrift::protocol::TProtocol* iprot_;
      ::apache::thrift::protocol::TProtocol* oprot_;
    };
    从源文件上看,通过类实现发现:

    completed__(bool /* success */)是虚函数,用于通知用户数据接收完成;
    sendString函数带有回调参数 function <void(TwitterCobClient* client)> cob,用于数据接收时回调,这是异步的特点;
    send_sendString和recv_sendString分别用于写数据到输出缓存和从输入缓存读数据
    列表内容拥有TAsyncChannel,异步功能的核心在于TAsyncChannel,它是用于回调函数注册和异步收发数据;
    Transport采用TMemoryBuffer,TMemoryBuffer是用于程序内部之间通信用的,在这里起到读写缓存作用

    下面看看关键函数 sendString的实现
    void  TwitterCobClient::sendString(t cxx::function<v  oid(TwitterCobClient* client)> cob, const std::string& data)
    {
      send_sendString(data);
      channel_->sendAndRecvMessage(tcxx::bind(cob, this), otrans_.get(), itrans_.get());
    }
    send_sendString函数是想缓冲区(TMemoryBuffer)写入数据, 而sendString 则通过调用TAsyncChannel的sendAndRecvMessage接口注册回调函数。

    TAsyncChannel作为接口类定义了三个接口函数。
    /**
       * Send a message over the channel.
       */
      virtual void sendMessage(const VoidCallback& cob,
        apache::thrift::transport::TMemoryBuffer* message) = 0;
    
    
      /**
       * Receive a message from the channel.
       */
      virtual void recvMessage(const VoidCallback& cob,
        apache::thrift::transport::TMemoryBuffer* message) = 0;
    
    
      /**
       * Send a message over the channel and receive a response.
       */
      virtual void sendAndRecvMessage(const VoidCallback& cob,
        apache::thrift::transport::TMemoryBuffer* sendBuf,
        apache::thrift::transport::TMemoryBuffer* recvBuf);
    TAsyncChannel目前为止(0.9.1版本)只有一种客户端实现类TEvhttpClientChannel,顾名思义它是基于libevent和http协议实现的。 使用libevent的方法就不在这里累赘了,主要看下sendAndRecvMessage的实现。
    void TEvhttpClientChannel::sendAndRecvMessage(
        const VoidCallback& cob,
        apache::thrift::transport::TMemoryBuffer* sendBuf,
        apache::thrift::transport::TMemoryBuffer* recvBuf) {
      cob_ = cob;// 绑定回调函数
      recvBuf_ = recvBuf;
      struct evhttp_request* req = evhttp_request_new(response, this);
      uint8_t* obuf;
      uint32_t sz;
      sendBuf->getBuffer(&obuf, &sz);
      rv = evbuffer_add(req->output_buffer, obuf, sz);
    
    
      rv = evhttp_make_request(conn_, req, EVHTTP_REQ_POST, path_.c_str());// 发送http 请求
    }

    从sendAndRecvMessage实现可看出,TEvhttpClientChannel是用采用http协议来与服务器通信,后面介绍异步server时会发现,同样采用是http协议,它们使用的http库是libevent库的evhttp。


    通过向evhttp_request中注册相应回调函数respones和传入回调实例本身的指针,在相应时候回调函数中调用TEvhttpClientChannel实例的finish接口完成数据接收,并写入缓存中,供应用层获取使用。 
    看下回调函数response 的实现:
    /* static */ void TEvhttpClientChannel::response(struct evhttp_request* req, void* arg) {
      TEvhttpClientChannel* self = (TEvhttpClientChannel*)arg;
      try {
        self->finish(req);
      } catch (std::exception& e) {
        // don't propagate a C++ exception in C code (e.g. libevent)
        std::cerr << "TEvhttpClientChannel::response exception thrown (ignored): " << e.what()
                  << std::endl;
      }
    }
    Thrift 异步server
    异步server关心另外两个类:TwitterCobSvIf和TwitterAsyncProcessor。很明显TwitterCobSvIf是要用户继承实现的,它与同步TwitterSvIf不同的地方是成员函数多一个cob回调函数,在实现TwitterSvIf时,需要调用cob。示例如下:
    class TwitterCobSvNull : virtual public TwitterCobSvIf {
     public:
      virtual ~TwitterCobSvNull() {}
      void sendString(tcxx::function<void(std::string const& _return)> cob, const std::string& /* data */) {
        std::string _return;
        return cob(_return);
      }
    };



    那么这个cob是什么函数,哪里注册的?这在thrift lib库里的TEvhttpServer和TAsyncProtocolProcessor类里可找到答案,其中TEvhttpServer是异步server,传输是采用http协议,与异步client对上。


    先看看TEvhttpServer实现,同样采用event_base来异步收发数据,收到数据时,回调request函数。

    void TEvhttpServer::request(struct evhttp_request* req, void* self) {
      try {
        static_cast<TEvhttpServer*>(self)->process(req);
      } catch(std::exception& e) {
        evhttp_send_reply(req, HTTP_INTERNAL, e.what(), 0);
      }
    }
    void TEvhttpServer::process(struct evhttp_request* req) {
      RequestContext* ctx = new RequestContext(req);
      return processor_->process(     // 这里的processor_正是TAsyncProtocolProcessor
          std::tr1::bind(
            &TEvhttpServer::complete,   // 注册complete
            this,
            ctx,
            std::tr1::placeholders::_1),
          ctx->ibuf,
          ctx->obuf);
    }
    
    
    void TEvhttpServer::complete(RequestContext* ctx, bool success) {
      std::auto_ptr<RequestContext> ptr(ctx);
    
    
      int code = success ? 200 : 400;
      const char* reason = success ? "OK" : "Bad Request";
    
      int rv = evhttp_add_header(ctx->req->output_headers, "Content-Type", "application/x-thrift");
      struct evbuffer* buf = evbuffer_new();
        uint8_t* obuf;
        uint32_t sz;
        ctx->obuf->getBuffer(&obuf, &sz);   // 从输出缓冲读数据
        int ret = evbuffer_add(buf, obuf, sz);
    
    
      evhttp_send_reply(ctx->req, code, reason, buf);   // 发送数据
      }
    接着看TAsyncProtocolProcessor的process实现
    void TAsyncProtocolProcessor::process(
        std::tr1::function<void(bool healthy)> _return,
        boost::shared_ptr<TBufferBase> ibuf,
        boost::shared_ptr<TBufferBase> obuf) {
      boost::shared_ptr<TProtocol> iprot(pfact_->getProtocol(ibuf));
      boost::shared_ptr<TProtocol> oprot(pfact_->getProtocol(obuf));
      return underlying_->process(     // underlying_是生成代码里的TwitterAsyncProcessor
          std::tr1::bind(
            &TAsyncProtocolProcessor::finish,   
            _return,  // compere函数
            oprot,
            std::tr1::placeholders::_1),
          iprot, oprot);
    }
    
    
    /* static */ void TAsyncProtocolProcessor::finish(
        std::tr1::function<void(bool healthy)> _return,
        boost::shared_ptr<TProtocol> oprot,
        bool healthy) {
      (void) oprot;
      // This is a stub function to hold a reference to oprot.
      return _return(healthy);  // 回调compere函数
    }
    最后看TwitterAsyncProcessor::process,它先写fname,mtype, seqid然后调用process_fn,process_fn选择调用合理的处理函数(如process_sendString),看process_sendString实现:

     void TwitterAsyncProcessor::process_sendString(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot)
    
    
    void (TwitterAsyncProcessor::*return_fn)(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, void* ctx, const std::string& _return)    =
        &TwitterAsyncProcessor::return_sendString;              // return_sendString正是我们要找的cob函数
        iface_->sendString(                      // iface_是TwitterCobSvIf的具体类,用户实现的
          std::tr1::bind(return_fn, this, cob, seqid, oprot, ctx, std::tr1::placeholders::_1),    // cob 是 finish函数
          args.data);
    
    
    }
    上面return_sendString是我们要找的cob函数,该函数将用户处理的结果写入输出冲缓,并发送给client。

    下面实现了一个异步客户端和异步服务端 
    采用异步时,必须采用http 传输层。


    异步客户端的实现
    demo_async_client.cc
    #include <string>
    #include "boost/shared_ptr.hpp"
    #include <thrift/Thrift.h>
    #include <thrift/protocol/TProtocol.h>
    #include <thrift/transport/TSocket.h>
    #include <thrift/transport/TTransportUtils.h>
    #include <thrift/concurrency/ThreadManager.h>
    #include <thrift/transport/TBufferTransports.h>
    #include <thrift/server/TServer.h>
    #include <thrift/async/TAsyncChannel.h>
    #include <thrift/async/TEvhttpClientChannel.h>
    #include "common/thrift/Twitter.h"
    #include "boost/function.hpp"
    #include "boost/bind.hpp"
    #include <event.h>
    #include <stdio.h>
    using namespace apache::thrift;
    using namespace apache::thrift::protocol;
    using namespace apache::thrift::transport;
    using std::string;
    using boost::shared_ptr;
    using namespace example;
    using namespace apache::thrift::async;
    
    class testClient : public TwitterCobClient
    {
    public:
      testClient(boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> channel, TProtocolFactory* protocolFactory)
          : TwitterCobClient(channel, protocolFactory)
      { };
    
    
      virtual void completed__(bool success)
      {
        if (success)
        {
                    printf("respone : %s \n", res.c_str());   // 输出返回结果
        }
        else
        {
          printf("failed to respone\n");
        }
        fflush(0);
      };
    
    
       string res;
    };
    
    
    //callback function
    static void my_recv_sendString(TwitterCobClient *client){
      client->recv_sendString(dynamic_cast<testClient*>(client)->res);
    }
    
    
    static void sendString(testClient & client){
    printf("snedstring start\n");
    std::function<void(TwitterCobClient*client)>cob = bind(&my_recv_sendString,_1);
    client.sendString(cob,"Hello");
    printf("sendstring end\n");
    }
    
    static void DoSimpleTest(const std::string & host, int port){
     printf("running SimpleTset(%s, %d)..\n", host.c_str(),port);
     event_base* base = event_base_new();
       boost::shared_ptr< ::apache::thrift::async::TAsyncChannel>  channel1( new TEvhttpClientChannel( host, "/", host.c_str(), port, base  ) );
    
    
      testClient client1( channel1,  new TBinaryProtocolFactory() );
    
    
      sendString(client1);   // 发送第一个请求
    
    
      boost::shared_ptr< ::apache::thrift::async::TAsyncChannel>  channel2( new TEvhttpClientChannel( host, "/", host.c_str(), port, base  ) );
    
    
      testClient client2( channel2,  new TBinaryProtocolFactory() );
    
    
      sendString(client2);  // 发送第二个请求
    
    
      event_base_dispatch(base);
    
    
      event_base_free(base);
    
    
      printf( "done DoSimpleTest().\n" );
    }
    
    
    int main( int argc, char* argv[] )
    {
      DoSimpleTest( "localhost", 14488 );
      return 0;
    
    
    }
    
    

    异步服务端的实现
    demo_async_serv.cc
    #include <string>
    #include "boost/shared_ptr.hpp"
    #include <thrift/Thrift.h>
    #include <thrift/protocol/TProtocol.h>
    #include <thrift/transport/TSocket.h>
    #include <thrift/transport/TTransportUtils.h>
    #include <thrift/concurrency/ThreadManager.h>
    #include <thrift/transport/TBufferTransports.h>
    #include <thrift/server/TServer.h>
    #include <thrift/async/TAsyncChannel.h>
    #include <thrift/async/TEvhttpClientChannel.h>
    #include "common/thrift/Twitter.h"
    #include <thrift/async/TAsyncProtocolProcessor.h>
    #include <thrift/async/TEvhttpServer.h>
    
    using namespace apache::thrift;
    using namespace apache::thrift::protocol;
    using namespace apache::thrift::transport;
    using std::string;
    using namespace boost;
    using namespace example;
    using namespace apache::thrift::async;
    
    
    class TwitterAsyncHandler : public TwitterCobSvIf {
     public:
      TwitterAsyncHandler() {
        // Your initialization goes here
      }
    
    
      void sendString(std::function<void(std::string const& _return)> cob, const std::string& data) {
        printf("sendString rec:%s\n", data.c_str());  // 输出收到的数据
        std::string _return = "world";   // 返回world字符串给客户端
        return cob(_return);
      }
    
    
    };
    
    int main(int argc, char **argv) {
      shared_ptr<TAsyncProcessor> underlying_pro(new TwitterAsyncProcessor( shared_ptr<TwitterCobSvIf>(new TwitterAsyncHandler()) ) );
      shared_ptr<TAsyncBufferProcessor> processor( new TAsyncProtocolProcessor( underlying_pro, shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()) ) );
    
    
      TEvhttpServer server(processor, 14488);
      server.serve();
      return 0;
    }
    

    参考
    http://blog.csdn.net/whycold/article/details/10973 
    http://tech.uc.cn/?p=2668553
    展开全文
  • mediacodec异步模式

    2018-06-12 17:00:09
    * 异步模式解码器 * @param surface */ public void setupAsynDecoder(Surface surface){ try { mediaCodec = MediaCodec.createDecoderByType("video/avc"); } catch (IOException e) { ...
    	/**
    	 * 异步模式解码器
    	 * @param surface
    	 */
    	public void setupAsynDecoder(Surface surface){
    		try {
    			mediaCodec = MediaCodec.createDecoderByType("video/avc");
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    
    		mediaCodec.setCallback(new MediaCodec.Callback() {
    			@Override
    			public void onInputBufferAvailable(@NonNull MediaCodec codec, int index) {
    				int length = 0;
    				ByteBuffer inputBuffer = mediaCodec.getInputBuffer(index);
    				Log.i(TAG, "inputbufferIndex:" + index);
    				try {
    					byte[] frameBuffer = ParamUtil.vedioQueue.consume();
    					inputBuffer.clear();
    					if(frameBuffer[0]==0 && frameBuffer[1]==0 && frameBuffer[2]==0 && frameBuffer[3]==1){
    						inputBuffer.put(frameBuffer,0,frameBuffer.length);
    						length = frameBuffer.length;
    					}
    					else {
    						inputBuffer.put(frameBuffer,16,frameBuffer.length-16);
    						length = frameBuffer.length - 16;
    					}
    					mediaCodec.queueInputBuffer(index, 0, length, mCount, 0);
    					mCount = mCount +100;
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    
    			@Override
    			public void onOutputBufferAvailable(@NonNull MediaCodec codec, int index, @NonNull BufferInfo info) {
    				ByteBuffer outputBuffer = mediaCodec.getOutputBuffer(index);
    				MediaFormat bufferFormat = mediaCodec.getOutputFormat(index);
    				Log.i(TAG, "ouputbufferIndex:" + index);
    				mediaCodec.releaseOutputBuffer(index,true);
    			}
    
    			@Override
    			public void onError(@NonNull MediaCodec codec, @NonNull MediaCodec.CodecException e) {
    
    			}
    
    			@Override
    			public void onOutputFormatChanged(@NonNull MediaCodec codec, @NonNull MediaFormat format) {
    
    			}
    		});
    
    		MediaFormat mediaFormat = MediaFormat.createVideoFormat("video/avc",320, 240);
    		mediaCodec.configure(mediaFormat, surface, null, 0);
    
    		mediaCodec.start();
    
    	}

    展开全文
  • 啥是异步模式 kafka的生产者可以选择使用异步方式发送数据,所谓异步方式,就是我们调用 send() 方法,并指定一个回调函数, 服务器在返回响应时调用该函数。 kafka在客户端里暴露了两个send方法,我们可以自己选择...

    啥是异步模式

    kafka的生产者可以选择使用异步方式发送数据,所谓异步方式,就是我们调用 send() 方法,并指定一个回调函数, 服务器在返回响应时调用该函数。

    kafka在客户端里暴露了两个send方法,我们可以自己选择同步或者异步模式。我们来看一个kafka的生产者发送示例,有个直观的感受。这个示例是一个同步的模式。

    ProducerRecord<String, String> record = new ProducerRecord<>(“Kafka”, “Kafka_Products”, “测试”);//Topic Key Value
    try{
    Future future = producer.send(record);
    future.get();//获取执行结果
    } catch(Exception e) {
    e.printStackTrace();
    }
    

    我们从源码层面来继续看下。

    首先kafka定义了一个接口,

    在这里插入图片描述

    然后KafkaProducer实现了这两个方法,我们看下异步方法的实现逻辑。

    在这里插入图片描述

    可以看到最终是调用doSend方法,调用的时候传入一个回调。这个回调就是监听方法的执行结果的。

    异步模式也会阻塞的

    很多人会认为,既然是异步模式,不管结果是成功还是失败,肯定方法调用会马上返回的。那我只能告诉你,不好意思,不一定是这样。我自己就曾经踩过这个坑。

    我们当时有个业务流程需要在执行完成后发送kakfa消息给某个业务方,为了尽量减少影响我这个主流程的执行时间,采用了异步方式发送kafka消息。在使用中,因为配错了kafka的TOPIC信息,发现流程阻塞发送消息这里长达6秒(kafka默认的发送超时时间)。

    究竟为啥异步方式还会阻塞呢?我们继续看源码。

    在这里插入图片描述
    不管是同步模式还是异步模式,最终都会调用到doSend方法,注意看上图中的waitOnMetadata方法,我上面说的阻塞的情况就是阻塞在这个方法里。那我们继续看这个方法。

    在这里插入图片描述
    通过代码中的注释我们大概能了解这个方法的功能,不过我这里还是要解释下。(防止有人看不懂英文,哈哈)

    waitOnMetadata获取当前的集群元数据信息,如果缓存有,并且分区没有超过指定分区范围则缓存返回,否则触发更新,等待新的metadata。这个等待的操作在下面这行代码:

    metadata.awaitUpdate(version, remainingWaitMs);
    

    然后就继续跟喽,

    在这里插入图片描述
    这个方法很好理解,就是一直在等一个条件,这个条件达到了就返回,否则一直等待超时退出。而这个条件就是当前的版本号要大于上个版本号。

    那么谁来更新版本号呢?就是我们前面提到的sender线程。当我们的topic配置错误的时候导致metadata一直无法更新,然后一直等到超时。

    破案了!

    总结

    kafka的异步模式可以让我们在业务场景中发送消息时即刻返回,不必等待发送的结果。但是当metadata取不到时,发送的过程还是需要等待一直超时的。

    程序员是一个尤其需要不断学习的工种,平时养成阅读源码的习惯,不光能避免踩一些坑,还能在遇到问题是快递定位到问题的根源。

    展开全文
  • 四、基于事件的异步模式(设计层面) 基于事件的C#异步编程模式是比IAsyncResult模式更高级的一种异步编程模式,也被用在更多的场合。该异步模式具有以下优点: · “在后台”执行耗时任务(例如下载和数据库操作...

    http://www.cnblogs.com/fish-li/archive/2011/10/23/2222013.html


    四、基于事件的异步模式(设计层面)

    基于事件的C#异步编程模式是比IAsyncResult模式更高级的一种异步编程模式,也被用在更多的场合。该异步模式具有以下优点:

    ·                  “在后台”执行耗时任务(例如下载和数据库操作),但不会中断您的应用程序。

    ·                  同时执行多个操作,每个操作完成时都会接到通知(在通知中可以区分是完成了哪个操作)。

    ·                  等待资源变得可用,但不会停止(“挂起”)您的应用程序。

    ·                  使用熟悉的事件和委托模型与挂起的异步操作通信。

    对于相对简单的应用程序可以直接用 .Net 2.0 新增的 BackgroundWorker 组件来很方便的实现,对于更复杂的异步应用程序则需要自己实现一个符合基于事件的C#异步编程模式的类。在实现基于事件的异步模式的设计前,需要了解基于事件的异步模式的实现原理是什么。基于事件的异步模式需要以下三个类型的帮助。

    AsyncOperation:提供了对异步操作的生存期进行跟踪的功能,包括操作进度通知和操作完成通知,并确保在正确的线程或上下文中调用客户端的事件处理程序。

    public void Post(SendOrPostCallback d,Object arg);

    public void PostOperationCompleted(SendOrPostCallback d,Object arg);

    通过在异步辅助代码中调用Post方法把进度和中间结果报告给用户,如果是取消异步任务或提示异步任务已完成,则通过调用PostOperationCompleted方法结束异步操作的跟踪生命期。在PostOperationCompleted方法调用后,AsyncOperation对象变得不再可用,再次访问将引发异常。在此有个问题:在该异步模式中,通过AsyncOperation的Post函数来通知进度的时候,是如何使SendOrPostCallback委托在UI线程上执行的?针对该问题下文有具体分析。

     

    AsyncOperationManager:为AsyncOperation对象的创建提供了便捷方式,通过CreateOperation方法可以创建多个AsyncOperation实例,实现对多个异步操作进行跟踪。

     

    WindowsFormsSynchronizationContext:该类继承自SynchronizationContext类型,提供 Windows 窗体应用程序模型的同步上下文。该类型是基于事件异步模式通信的核心。之所以说该类型是基于事件异步模式的通信核心,是因为该类型解决了“保证SendOrPostCallback委托在UI线程上执行”的问题。它是如何解决的?请看AsyncOperation类型的Post方法的实现:

    [csharp] view plain copy
    1. /// <summary>  
    2.    /// AsyncOperation类型的Post方法的实现  
    3.    /// </summary>  
    4. public void Post(SendOrPostCallback d, object arg)  
    5. {  
    6.     this.VerifyNotCompleted();  
    7.     this.VerifyDelegateNotNull(d);  
    8.     this.syncContext.Post(d, arg);  
    9. }  


     

    在AsyncOperation类型的Post方法中,直接调用了SynchronizationContext类型的Post方法,再看该Post方法的实现:

    [csharp] view plain copy
    1. /// <summary>  
    2.    /// WindowsFormsSynchronizationContext类型的Post方法的实现  
    3.    /// </summary>  
    4. public override void Post(SendOrPostCallback d, object state)  
    5. {  
    6.     if (this.controlToSendTo != null)  
    7.     {  
    8.         this.controlToSendTo.BeginInvoke(d, new object[] { state }); //此处保证了SendOrPostCallBack委托在UI线程上执行  
    9.   
    10.     }  
    11. }  


     

    有以上三个类型(AsyncOpertion,AsyncOperationManager和SynchronizationContext)作为基础,实现基于事件的异步模式的进度通知和完成通知就轻松多了。下面用一个基于事件的异步模型的例子来结束本文章。

    [csharp] view plain copy
    1. using System;  
    2. using System.Collections.Generic;  
    3. using System.Text;  
    4. using System.ComponentModel;  
    5. using System.Collections.Specialized;  
    6. using System.Threading;  
    7.   
    8. namespace test  
    9. {  
    10.     /// <summary>  
    11.     /// 任务1的进度通知代理  
    12.     /// </summary>  
    13.     /// <param name="sender"></param>  
    14.     /// <param name="e"></param>  
    15.     public delegate void Work1ProgressChangedEventHandler(object sender, Work1ProgressChangedEventArgs e);  
    16.     /// <summary>  
    17.     /// 任务1的进度通知参数  
    18.     /// </summary>  
    19.     /// <param name="sender"></param>  
    20.     /// <param name="e"></param>  
    21.     public delegate void Work1CompletedEventHandler(object sender, Work1CompletedEventArgs e);  
    22.   
    23.     public class BasedEventAsyncWorker  
    24.     {  
    25.         private delegate void WorkerEventHandler(int maxNumber, AsyncOperation asyncOp);  
    26.         private HybridDictionary userStateToLifetime = new HybridDictionary();  
    27.   
    28.         public BasedEventAsyncWorker()  
    29.         { }  
    30.  
    31.         #region DoWork1的基于事件的异步调用  
    32.         public void DoWork1Async(object userState, int maxNumber)  
    33.         {  
    34.             AsyncOperation asyncOp = AsyncOperationManager.CreateOperation(userState);  
    35.   
    36.             //userStateToLifetime有可能会同时被多线程访问,在此需要lock进行同步处理  
    37.             lock (userStateToLifetime.SyncRoot)  
    38.             {  
    39.                 if (userStateToLifetime.Contains(userState))  
    40.                 {  
    41.                     throw new ArgumentException(  
    42.                         "userState parameter must be unique",  
    43.                         "userState");  
    44.                 }  
    45.   
    46.                 userStateToLifetime[userState] = asyncOp;  
    47.             }  
    48.   
    49.             //异步开始任务1  
    50.             WorkerEventHandler workerDelegate = new WorkerEventHandler(DoWork1);  
    51.             workerDelegate.BeginInvoke(maxNumber, asyncOp, nullnull);  
    52.         }  
    53.   
    54.         private void DoWork1(int maxNumber, AsyncOperation asyncOp)  
    55.         {  
    56.             Exception e = null;  
    57.   
    58.             //判断该userState的任务仍在处理中  
    59.             if (!TaskCanceled(asyncOp.UserSuppliedState))  
    60.             {  
    61.                 try  
    62.                 {  
    63.                     int n = 0;  
    64.                     int percentage = 0;  
    65.                     while (n < maxNumber && !TaskCanceled(asyncOp.UserSuppliedState))  
    66.                     {  
    67.                         Thread.Sleep(100); //模拟耗时操作  
    68.                         percentage = (int)((float)n / (float)maxNumber * 100);  
    69.                         Work1ProgressChangedEventArgs progressChanageArgs =  
    70.                             new Work1ProgressChangedEventArgs(maxNumber, percentage, asyncOp.UserSuppliedState);  
    71.                         //任务1的进度通知  
    72.                         asyncOp.Post(new SendOrPostCallback(Work1ReportProgressCB), progressChanageArgs);   
    73.                         n++;  
    74.                     }  
    75.                 }  
    76.                 catch (Exception ex)  
    77.                 {  
    78.                     e = ex;  
    79.                 }  
    80.             }  
    81.   
    82.             this.Work1Complete(e, TaskCanceled(asyncOp.UserSuppliedState), asyncOp);  
    83.         }  
    84.   
    85.         private void Work1Complete(Exception exception, bool canceled, AsyncOperation asyncOp)  
    86.         {  
    87.             if (!canceled)  
    88.             {  
    89.                 lock (userStateToLifetime.SyncRoot)  
    90.                 {  
    91.                     userStateToLifetime.Remove(asyncOp.UserSuppliedState);  
    92.                 }  
    93.             }  
    94.   
    95.             Work1CompletedEventArgs e = new Work1CompletedEventArgs(exception, canceled, asyncOp.UserSuppliedState);  
    96.   
    97.             //通知指定的任务已经完成  
    98.             asyncOp.PostOperationCompleted(new SendOrPostCallback(Work1CompleteCB), e);  
    99.   
    100.             //调用 PostOperationCompleted 方法来结束异步操作的生存期。  
    101.             //为某个特定任务调用此方法后,再调用其相应的 AsyncOperation 对象会引发异常。  
    102.         }  
    103.   
    104.         private void Work1ReportProgressCB(object state)  
    105.         {  
    106.             Work1ProgressChangedEventArgs e = state as Work1ProgressChangedEventArgs;  
    107.   
    108.             OnWork1ProgressChanged(e);  
    109.         }  
    110.   
    111.         private void Work1CompleteCB(object state)  
    112.         {  
    113.             Work1CompletedEventArgs e = state as Work1CompletedEventArgs;  
    114.   
    115.             OnWork1Completed(e);  
    116.         }  
    117.  
    118.         #region Work1的进度通知和任务完成的事件  
    119.         public event Work1ProgressChangedEventHandler Work1ProgressChanged;  
    120.         protected virtual void OnWork1ProgressChanged(Work1ProgressChangedEventArgs e)  
    121.         {  
    122.             Work1ProgressChangedEventHandler temp = this.Work1ProgressChanged;  
    123.             if (temp != null)  
    124.             {  
    125.                 temp(this, e);  
    126.             }  
    127.         }  
    128.   
    129.         public event Work1CompletedEventHandler Work1Completed;  
    130.         protected virtual void OnWork1Completed(Work1CompletedEventArgs e)  
    131.         {  
    132.             Work1CompletedEventHandler temp = this.Work1Completed;  
    133.             if (temp != null)  
    134.             {  
    135.                 temp(this, e);  
    136.             }  
    137.         }   
    138.         #endregion   
    139.         #endregion  
    140.   
    141.         /// <summary>  
    142.         /// 取消指定userState的任务执行  
    143.         /// </summary>  
    144.         /// <param name="userState"></param>  
    145.         public void CancelAsync(object userState)  
    146.         {  
    147.             AsyncOperation asyncOp = userStateToLifetime[userState] as AsyncOperation;  
    148.             if (asyncOp != null)  
    149.             {  
    150.                 lock (userStateToLifetime.SyncRoot)  
    151.                 {  
    152.                     userStateToLifetime.Remove(userState);  
    153.                 }  
    154.             }  
    155.         }  
    156.   
    157.         /// <summary>  
    158.         /// 判断指定userState的任务是否已经被结束。返回值:true 已经结束; false 还没有结束  
    159.         /// </summary>  
    160.         /// <param name="userState"></param>  
    161.         /// <returns></returns>  
    162.         private bool TaskCanceled(object userState)  
    163.         {  
    164.             return (userStateToLifetime[userState] == null);  
    165.         }  
    166.   
    167.   
    168.     }  
    169.   
    170.     public class Work1ProgressChangedEventArgs :ProgressChangedEventArgs  
    171.     {  
    172.         private int totalWork = 1;  
    173.   
    174.         public Work1ProgressChangedEventArgs(int totalWork, int progressPercentage, object userState)  
    175.             : base(progressPercentage, userState)  
    176.         {  
    177.             this.totalWork = totalWork;  
    178.         }  
    179.   
    180.         /// <summary>  
    181.         /// Work1的总工作量  
    182.         /// </summary>  
    183.         public int TotalWork  
    184.         {  
    185.             get  
    186.             {  
    187.                 return totalWork;  
    188.             }  
    189.         }  
    190.     }  
    191.   
    192.     public class Work1CompletedEventArgs : AsyncCompletedEventArgs  
    193.     {  
    194.         public Work1CompletedEventArgs(Exception e, bool canceled, object state)  
    195.             : base(e, canceled, state)  
    196.         {  
    197.         }  
    198.   
    199.     }  
    200.   
    201. }  
    展开全文
  • 半同步/半异步模式、半同步/半反应堆模式、相对高效的半同步/半异步模式、领导者/追随者模式
  • ajax同步模式和异步模式的区别就是在于,xhr.open()方法第3个参数传入的bool值的区别,xhr.open()方法第3个参数的作用就是设置此次请求是否采用异步模式执行,默认为true ,那么同步模式xhr.open()方法第3个参数值...
  • Python3.5+中用异步模式操作文件的方法,async file io线程开启文件读取异步模式使用已编写好的第三方插件-aiofiles,支持异步模式插件可支持的属性参考: 平常使用的file操作模式为同步,并且为线程阻塞。当程序I/O...
  • 直接上图了,规范性先忽略。 ASIO同步模式。这个相对简单 ASIO异步模式
  • RabbitMQ之confirm异步模式

    千次阅读 2019-06-11 17:17:06
    RabbitMQ之confirm异步模式简介 * Channel 对象提供的ConfirmListener()回调方法只包含 * deliveryTag(当前Chanel发出的消息序号),我们需要自己 * 为每一个Channel维护一个unconfirm的消息序号集合,每publish...
  • 半同步半异步模式

    千次阅读 2014-07-03 10:16:36
    半同步半异步模式 一个架构模式,清晰的结构,高效并发的I/O 译者: 英文原文: http://www.cs.wustl.edu/~schmidt/PDF/HS-HA.pdf 摘要 这篇文字介绍了半同步半异步模式,这个模式运用在复杂的并行系统中,...
  • Spring中的异步模式

    千次阅读 2016-12-26 13:19:54
    什么是异步模式 要知道什么是异步模式,就先要知道什么是同步模式,先看最典型的同步模式: (图1) 浏览器发起请求,Web服务器开一个线程处理,处理完把处理结果返回浏览器。好像没什么好说的了,绝大...
  • 事件驱动异步模式

    千次阅读 2015-08-26 21:35:23
    事件驱动异步模式   前言   啥叫事件?啥叫驱动?异步又是啥玩意?这些字一个一个的我都认识,但是练起来我就不知道啥意思了,别急,往下看.   在下一篇文章中,我会专门介绍并发,同步,异步以及事件...
  • Creo二次开发异步模式配置

    千次阅读 2015-08-01 06:31:43
    异步模式是Creo二次开发的另外一种形式,被广泛应用于一些外部调用中。其配置过程如下: (1) 创建MFC程序(对话框程序都可以了) (2) 配置工程常规选项 (3) 配置VC目录 (4) 配置附加依赖...
  • GPU: 多GPU训练的同步模式和异步模式

    千次阅读 2018-10-05 21:11:13
    常用的并行化深度学习模型训练方式有两种:同步模式和异步模式。 下面将介绍这两种模式的工作方式及其优劣。 如下图,深度学习模型的训练是一个迭代的过程。 在每一轮迭代中,前向传播算法会根据当前参数的取值...
  • 初探Thrift客户端异步模式

    千次阅读 2016-09-18 15:12:21
    初探Thrift客户端异步模式 Posted by  tangzheng  on 2014 年 1 月 6 日 Tweet 4 背景 在某项目中,我们广泛使用thrift作为我们内部接口调用的RPC框架,而且...
  • Java笔记-RabbitMQ中生产者端confirm模式(异步模式)

    千次阅读 多人点赞 2019-07-06 16:58:36
    异步模式:Channel对象提供的ConfirmListener()回调方法只包含deliverTag(当前Channel发出的消息序列号),需要自己为每一个Channel维护一个cunconfirm的消息序列号集合,每个publish数据,集合中元素+1,回调一次...
  • 半同步/半异步模式

    2017-02-15 11:19:03
    半同步/半异步模式的结构遵循分层模式,包括四层:同步服务层 同步服务层:完成高层处理服务。同步层中的服务在独立的操作情况下可以阻塞现成或进程。 异步服务层:进行低层处理服务。这些低层处理服务通常由一个...
  • 虽然Ajax可以设置为同步或者异步模式,但是在某些情况下(比如用jsonp实现ajax跨域调用)只能让Ajax工作在异步模式下。但是很多时候异步模式会带来一些编程上的麻烦,所以我写了个用于同步异步ajax的函数,在这里和...
  • 高效的半同步/半异步模式的实现

    千次阅读 2018-03-16 21:19:57
    先介绍一下半同步/半异步模式:首先半同步/半异步模式中的同步和异步和前面的IO模型中的同步和异步是完全不用的概念。在IO模型中,同步和异步区分的是内核向应用程序通知的是何种IO事件(是就绪事件还是完成事件),...
  • RPC 工具 --Thrift(二) Thrift 异步模式

    千次阅读 2016-07-10 11:15:40
    Thrift异步模式我们广泛使用thrift作为我们内部接口调用的RPC框架,而且基本上都是使用多线程请求等待应答的同步模式 。但是在一些情况下(例如大数据量同步),如果可以使用异步模式,可以优化程序结构和提高模块...
  • 并发模式是指I/O处理单元和多个逻辑单元之间协调完成任务的方法1、半同步/半异步模式1.1半同步/半异步模式【1】同步:程序完全按照代码顺序执行;异步:程序的执行需要由系统事件来驱动。常见的系统事件包括中断,...
  • 半同步半异步模式:出现原因: 异步线程执行效率高,但是编写复杂,同步线程效率低,但是逻辑简单。服务器要求好的实时性和同时处理多用户的请求,英雌采用两者结合的形式。 具体情况: 过程: 异步线程见听到...
  • 如果您正使用一些可能导致显著的延迟的操作编写类,请考虑通过实现 基于事件的异步模式概述 向类提供异步功能。 基于事件的异步模式提供了一个打包具有异步功能的类的标准化方式。 如果使用像 ...
  • 基于事件的异步模式提供了一种公开类的异步行为的模式。 引入此模式后,.NET Framework 定义了两种公开异步行为的模式:基于 System.IAsyncResult 接口的异步模式和基于事件的模式。 本主题介绍何时适合实现上述...
  • 高性能的关键:Spring MVC的异步模式

    千次阅读 2016-08-21 13:39:06
    我承认有些标题党了,不过话说这样其实也没错,关于“异步”处理的文章已经不少,代码例子也能找到很多,但我还是打算发表这篇我写了好长一段时间,却一直没发表的文章,以一个更简单的视角,把异步模式讲清楚。...
  • 异步模式的服务器源码 异步模式的客户端源码 运行效果截图
  • 基于事件的异步模式

    千次阅读 2010-10-14 16:24:00
    本篇学习了基于事件的异步模式。问题的提出:基于IAsyncResult接口的异步模式.在异步回调中,回调线程不同于调用线程。使用Windows窗体或WPF时,这就是个问题,因为Windows窗体和WPF控件绑定到一个线程上。对于每个...
  • Winsock异步模式之一select 选择模式

    千次阅读 2009-12-03 10:40:00
    最近在学习winsock的...为了巩固我的学习成果,同时也是为了给后来者提供一点点的帮助,我决定把我的学习心得以及对winsock五种异步模式的理解记录下来。当然我对自己的要求是:不求面面俱到,但要让大部分人看懂。 

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 443,934
精华内容 177,573
关键字:

异步模式