精华内容
下载资源
问答
  • Tornado异步原理详析

    2019-05-06 18:38:14
    Tornado异步原理详析 Tornado是什么 Tornado是一个用Python编写的异步HTTP服务器,同时也是一个web开发框架. Tornado优秀的大并发处理能力得益于它的web server从底层开始就自己实现了一整套基于epoll的单线程异步...

    Tornado异步原理详析

    Tornado是什么

    Tornado是一个用Python编写的异步HTTP服务器,同时也是一个web开发框架.
    Tornado优秀的大并发处理能力得益于它的web server从底层开始就自己实现了一整套基于epoll的单线程异步架构

    同步, 异步 编程差异

    • 对于同步阻塞型Web服务器,我们打个比方,将它比作一间饭馆,而Web请求就是来这家饭馆里吃饭的客人. 假设饭店里只能有20个座位,那么同事能够就餐的客人数量就是20, 剩下的客人被迫就在店门外等,如果客人吃的太慢了,那么外面的客人就等的不耐烦了,就会走掉(timeout).
    • 对于异步非阻塞型服务器,我们打另一个比方,将它比作一家超市,客人们想进就能进,前往货架拿他们想要的货物,然后再去收银台结账(callback),假设,这家超市只有20个收银台,却可以同时满足成百上千人的购物需求.和购物的时间长度比起来,结账的时间基本可以忽略不计.

    大部分web应用都是阻塞性质的,也就是说当一个请求被处理时,这个进程就会被挂起直至请求完成.
    假设你正在写一个需要请求一些来自其它服务器上的数据(比如数据库服务,调用其它http接口获取数据)的应用程序,这几个请求假设需要花费5秒钟,大多数的web开发框架中处理请求的代码:

    def handler_request(self, request):
        answ = self.remote_server.query(request)  # 耗时5秒
        request.write_response(answ)
    

    如果这些代码运行在单个线程中,你的服务器只能每5秒接收一个客户端的请求. 在这5秒钟的时间里,服务器不能干其它任何事情,所以,你的服务效率是每秒0.2个请求,这样的效率是不能接受的.

    大部分服务器会使用多线程技术来让服务器一次接收多个客服端的请求,我们假设你有20个线程,你将在性能上获取20倍的提高,所以现在你的服务器效率是每秒接受4个请求,但这还是太低了.
    当然,你可以通过不断的提高线程的数量来解决这个问题,但是,线程在内存和调用方面的开销是昂贵的,大多数Linux发行版中都是默认线程堆大小为8MB. 为每个打开的连接维护一个打的线程等待数据极易迅速耗光服务器的内存资源.可能这种提高线程数量的方式将永远不可能达到每秒100个请求的效率.

    如果使用异步IO(asynchronous IO AIO), 达到每秒上千个请求的效率是非常轻松的事情. 服务器请求处理的代码将被改成这样:

    def  handler_request(self, request):
        self.remote_server.query_async(request, self.response_received)
    def response_received(self, request, answ):  # 回调函数 耗时5秒
        request.write(answ)
    

    AIO思想是当我们在等待结果的时候不阻塞,转而我们给框架一个回调函数作为参数,让框架在收到结果的时候通过回调函数继续操作. 这样,服务器就可以被解放去接受其他客服端的请求了.

    IO复用 Epoll

    tornado.ioloop就是tornado web server异步最底层的实现.
    看ioloop之前,我们需要了解一些预备知识,有助于我们立即ioloop

    ioloop的实现基于epoll,什么是epoll?epoll是Linux内核作为处理大批量文件描述符儿做了改进的poll.
    那么什么又是poll?首先,我们回顾一下,socket通信的服务端,当它接受(accept)一个连接并建立通讯后(connection)就进行通信,而此时我们并不知道连接的客户端有没有信息发完. 这时候我们有两种选择:

    1. 一直在这里等待直到收发数据结束;
    2. 每隔一定时间来看看这里有没有数据;

    第一种办法虽然可以解决问题,但我们要注意的是对于一个线程/进程同事只能处理一个socket通信,其他连接只能被阻塞,显然这种方式在单进程情况下不现实.
    第二种办法要比第一种好一些,多个连接可以统一在一定时间内轮流看一遍里面有没有数据要读写,看上去我们可以处理多个连接了,这个方式就是poll/select的解决方案.看起来似乎解决了问题,但实际上,随着连接越来越多,轮询所花费的时间将越来越长,而服务器连接的socket大多不是活跃的,所以轮询所花费的大部分时间将是无用的.

    为了解决这个问题,epoll被创造出来,它的概念和poll类似,不过每次轮询时,他只会把有数据活跃的socket挑出来轮询,这样在有大量连接时轮询就节省了大量时间.

    对于epoll的操作,其实也很简单,只要四个API就可以完全操作它.

    1. epoll_create
      用来创建一个epoll描述符(就是创建了一个epoll)
    2. epoll_ctl
      对epoll事件操作,包括以下操作:
      EPOLL_CTL_ADD 添加一个新的epoll事件
      EPOLL_CTL_DEL 删除一个epoll事件
      EPOLL_CTL_MOD 改变一个事件的监听方式

    epoll监听的事件七种,而我们只需要关心其中的三种:
    EPOLLIN 缓冲区满,有数据可读(read)
    EPOLLOUT 缓冲区空,可写数据(write)
    EPOLLERR 发生错误 (error)

    1. epoll_wait
      就是让epoll开始工作,里面有个参数timeout,当设置为非0正整数时,会监听(阻塞)timeout秒;设置为0时立即返回,设置为-1时一直监听.
      在监听时有数据活跃的连接时其返回胡偶尔的文件句柄列表(此处为socket文件句柄).

    2. close
      关闭epoll
      IO复用详解可以参考另一篇文章: IO常见模型-详解io多路复用

    IOLoop模块

    让我们通过查看ioloop.py文件直接进入服务器的核心. 这个模块是异步机制的核心. 它包含了一系列已经打开的文件描述符(文件指针)和每个描述符的处理器(handlers).
    它的功能是选择那些已经准备好读写的文件描述符,然后调用他们各自的处理(一种IO多路服用的实现,select/epoll).
    可以通过调用add_handler()方法将一个socket加入IO循环中:

    """为文件描述符注册指定处理器(calback),当文件描述指定的事件发生"""
    def add_handler(self, fd, handler, events):
        self._handlers[fd] = handler
        self._impl.register(fd, events | self.ERROR)
    

    _handler这个字典类型的变量保存着文件描述符(其实就是socket)到当该文件描述符准备好时需要调用的方法的映射(在Tornado中,该方法被称为处理器).然后,文件描述符被注册到eopll列表中.Tornado关心三种类型的事件(指发生在文件描述上的事件):READ,WRITE和ERROR. 正如你所见,ERROR是默认为你自动添加的. self._impl是select.epoll()和select.select()两者中的一个
    现在让我们来看看实际的主循环,这段代码被放在了start()方法中:

    def start(self):
        """Starts the I/O loop.
        The loop wil run until one of the I/O handlerscalls stop(), which will make the loop stop after the current event iteration completes
        """
        self._running = True
        while True:  # 开始时间循环 Event Loop
            [...]
            if not self._running:
                break
            [...]
            try:
                event_pairs = self._impl.poll(poll_timeout)  # 通过epoll/select机制返回有事件返回的(fd: events)的键值对. 可能不是一个事件
            except Exception, e:
                if e.args == (4, "Interrupted system call"):
                    logging.warning("Interrupted system call", exc_info=1)
                    continue
                else:
                    raise
            # Pop one fd at a time from the set of pending fds and run
            # its handler. Since that handler may perform actions on
            # other file descriptors, there may be reentrant calls to 
            # this IOLoop that update self._events
            self._events.update(event_pairs)  # 更新所有准备好的事件列表
            while self._events:
                fd, events = self._events.popitem()  # 循环逐个弹出可以执行的socket和事件
                try:
                    self._handlers[fd](fd, events)  # 之前通过add_handler注册的fd和回调函数,到这里就可以执行相应的回调函数了
                except KeyboardInterrupt:
                    raise
                except OSError, e:
                    if e[0] == errno.EPIPE:
                        # Happens when the client closes the connection
                        pass
                    else:
                        logging.error("Exception in I/O handler for fd %d", fd, exc_info=True)
                except:
                    logging.error("Exception in I/O handler for fd %d", fd, exc_info=True)
    

    这就是异步的核心组件IOLoop的核心工作,我们来看看它的工作流程:

    • 开始一个事件循环Event Loop, 用于检测被注册到这里的fd(非阻塞socket),如果有可执行事件发生,就执行相应的灰调函数
    • event_pairs = self_impl.poll(poll_timeout)通过epoll/select机制返回有时间发生的(fd:events)的键值对
    • self._events.udate(event_pairs) 更新所有准备好的事件列表
    • while self._events循环这个事件列表,循环这个弹出可以待执行的socket和事件
    • self._handlers[fd](fd, events)之前通过add_handler注册的fd和回调函数,到这里就可以执行相应的回调函数了

    通过上面介绍的add_handler注册socket->callbakc, 这个start()功能就是tornado开启的一个单线程时间IO循环,用于检测所有非阻塞socket的事件,只要被注册的socket时间发生了,就执行注册时的回调函数.
    具体到实际就是可以分为这两种情况:

    • 监听连接: 一开始创建的一个服务器端socket监听端口,等待客户端连接,这时通过setblocking(0)设置这个socket为非阻塞,然后add_handler(socket, handler_connection, READ)注册这个socket的可读事件,只有要新连接过来,就会出发事件,handler_connection这个回调函数就执行.
    • 请求其它数据时: 比如http_client.fetch()connected, recvfrom的socket都会设置成nonblocking非阻塞,同时add_handler注册, 等待事件发生,并调用回到函数.

    例子: 一个建议的服务器监听

    def connection_ready(sock, fd, events):
        while True:
            try: 
                connection, address = sock.accept()
            except socket.error as e:
                if e.args[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
                    raise
                return
            connection.setblocking(0)
            handle_connection(connection, address)
    if __name__ == '__main__':
        sock = socket.socket(socket.Af_INET, socket.SOCKE_STREAM, 0)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.setblocking(0)  # 把监听的socket设置为非阻塞
        sock.bind(("", prot))
        sock.listen(128)
    
        io_loop = tornado.ioloop.IOLoop.current()
        callback = functools.partial(connection_ready, sock)
        io_loop.add-handler(sock.fileno(), callback, io_loop.READ)  # 注册这个服务器端监听socket可读时间,同时注册这个回调函数
        io_loop.start()
    

    tornado 异步原理.jpg

    效率对比实例代码

    """异步抓取网页"""
    class AsyncHandler(RequestHandler):
        @asynchronous
        def get(self):
            http_client = AsyncHTTPClient()
            http_client.getch("http://www.163.com", callback=self.on_fetch)
        def on_fetch(self, response):
            print(response)
            self.write('done')
            self.finish()
    
    
    """同步抓取网页"""
    class SyncHandler(RequestHandler):
        def get(self):
            http_client = HTTPClient()
            response = http_client.fetch("http://www.163.com")
            print(response)
            self.write('done')
    

    进行压测测试

    # 异步代码压测结果
    Document Path:          /async_fetch/
    Document Length:        4 bytes
    
    Concurrency Level:      5
    Time taken for tests:   1.945 seconds
    Complete requests:      50
    Requests per second:    25.71 [#/sec] (mean)
    Time per request:       194.488 [ms] (mean)
    Time per request:       38.898 [ms] (mean, across all concurrent requests)
    
    # 同步代码压测结果
    Document Path:          /sync_fetch/
    Concurrency Level:      5
    Time taken for tests:   5.423 seconds
    Complete requests:      50
    Requests per second:    9.22 [#/sec] (mean)
    Time per request:       542.251 [ms] (mean)
    Time per request:       108.450 [ms] (mean, across all concurrent requests)
    

    可以看出异步比同步的性能高很多

    总结

    • Tornado的异步条件: 要使用异步,就必须把IO操作变成非阻塞的IO
    • Tornado的异步原理: 单线程的tornado打开一个IO时间循环,当碰到IO请求(新连接进来或者调用api获取数据),由于这些IO请求都是非阻塞的IO,都会把这些非阻塞的IO socket扔到一个socket管理器,所以,这里单线程的CPU只要发起一个网络IO请求,就不用挂起线程等待IO结果,这个单线程的事件继续循环,接受其它请求或者IO操作,如此循环.
    展开全文
  • Tornado异步原理

    2019-07-06 23:54:55
    Tornado 优秀的大并发处理能力得益于它的 web server 从底层开始就自己实现了一整套基于 epoll 的单线程异步架构。 同步、异步编程差异 对于同步阻塞型Web服务器,我们来打个比方,将它比作一间饭馆,而Web请求...

    Tornado是什么?

    Tornado是一个用Python编写的异步HTTP服务器,同时也是一个web开发框架。
    Tornado 优秀的大并发处理能力得益于它的 web server 从底层开始就自己实现了一整套基于 epoll 的单线程异步架构。

    同步、异步编程差异

    • 对于同步阻塞型Web服务器,我们来打个比方,将它比作一间饭馆,而Web请求就是来这家饭馆里吃饭的客人。假设饭馆店里只有20个座位,那么同时能够就餐的客人数也就是20,剩下的客人被迫就在店门外等,如果客人们吃的太慢了,那么外面的客人等得不耐烦了,就会走掉(timeout)。

    • 对于异步非阻塞型服务器,我们打另一个比方,将它比作一家超市,客人们想进就能进,前往货架拿他们想要的货物,然后再去收银台结账(callback),假设,这家超市只有20个收银台,却可以同时满足成百上千人的购物需求。和购物的时间长度比起来,结账的时间基本可以忽略不计。

    大部分Web应用都是阻塞性质的,也就是说当一个请求被处理时,这个进程就会被挂起直至请求完成。
    假设你正在写一个需要请求一些来自其他服务器上的数据(比如数据库服务,调用其他http 接口获取数据)的应用程序,这几个请求假设需要花费5秒钟,大多数的web开发框架中处理请求的代码:



     

    展开全文
  • [Python]Tornado异步原理详析

    千次阅读 2018-05-06 23:07:28
    Tornado 优秀的大并发处理能力得益于它的 web server 从底层开始就自己实现了一整套基于 epoll 的单线程异步架构。 同步、异步编程差异 对于同步阻塞型Web服务器,我们来打个比方,将它比作一间饭馆,而Web...

    Tornado是什么?

    Tornado是一个用Python编写的异步HTTP服务器,同时也是一个web开发框架。
    Tornado 优秀的大并发处理能力得益于它的 web server 从底层开始就自己实现了一整套基于 epoll 的单线程异步架构。

    同步、异步编程差异

    • 对于同步阻塞型Web服务器,我们来打个比方,将它比作一间饭馆,而Web请求就是来这家饭馆里吃饭的客人。假设饭馆店里只有20个座位,那么同时能够就餐的客人数也就是20,剩下的客人被迫就在店门外等,如果客人们吃的太慢了,那么外面的客人等得不耐烦了,就会走掉(timeout)。

    • 对于异步非阻塞型服务器,我们打另一个比方,将它比作一家超市,客人们想进就能进,前往货架拿他们想要的货物,然后再去收银台结账(callback),假设,这家超市只有20个收银台,却可以同时满足成百上千人的购物需求。和购物的时间长度比起来,结账的时间基本可以忽略不计。

    大部分Web应用都是阻塞性质的,也就是说当一个请求被处理时,这个进程就会被挂起直至请求完成。
    假设你正在写一个需要请求一些来自其他服务器上的数据(比如数据库服务,调用其他http 接口获取数据)的应用程序,这几个请求假设需要花费5秒钟,大多数的web开发框架中处理请求的代码:

    def handler_request(self, request):
        answ = self.remote_server.query(request) # 耗时5秒
        request.write_response(answ)
    

    如果这些代码运行在单个线程中,你的服务器只能每5秒接收一个客户端的请求。在这5秒钟的时间里,服务器不能干其他任何事情,所以,你的服务效率是每秒0.2个请求, 这样的效率时不能接受。

    大部分服务器会使用多线程技术来让服务器一次接收多个客户端的请求,我们假设你有20个线程,你将在性能上获得20倍的提高,所以现在你的服务器效率是每秒接受4个请求,但这还是太低了。
    当然,你可以通过不断地提高线程的数量来解决这个问题,但是,线程在内存和调度方面的开销是昂贵的,大多数Linux发布版中都是默认线程堆大小为8MB。为每个打开的连接维护一个大的线程池等待数据极易迅速耗光服务器的内存资源。可能这种提高线程数量的方式将永远不可能达到每秒100个请求的效率。

    如果使用异步IO(asynchronous IO AIO),达到每秒上千个请求的效率是非常轻松的事情。服务器请求处理的代码将被改成这样:

    def handler_request(self, request):
       self.remote_server.query_async(request, self.response_received)     
    def response_received(self, request, answ):    #回调函数 耗时5秒
       request.write(answ)
    

    AIO的思想是当我们在等待结果的时候不阻塞,转而我们给框架一个回调函数作为参数,让框架在收到结果的时候通过回调函数继续操作。这样,服务器就可以被解放去接受其他客户端的请求了。

    IO复用 Epoll

    tornado.ioloop 就是 tornado web server 异步最底层的实现。
    看 ioloop 之前,我们需要了解一些预备知识,有助于我们理解 ioloop。

    ioloop 的实现基于 epoll ,那么什么是 epoll? epoll 是Linux内核为处理大批量文件描述符而作了改进的 poll 。
    那么什么又是 poll ? 首先,我们回顾一下, socket 通信时的服务端,当它接受( accept )一个连接并建立通信后( connection )就进行通信,而此时我们并不知道连接的客户端有没有信息发完。 这时候我们有两种选择:

    1. 一直在这里等着直到收发数据结束;
    2. 每隔一定时间来看看这里有没有数据;

    第一种办法虽然可以解决问题,但我们要注意的是对于一个线程\进程同时只能处理一个 socket 通信,其他连接只能被阻塞,显然这种方式在单进程情况下不现实。

    第二种办法要比第一种好一些,多个连接可以统一在一定时间内轮流看一遍里面有没有数据要读写,看上去我们可以处理多个连接了,这个方式就是 poll / select 的解决方案。 看起来似乎解决了问题,但实际上,随着连接越来越多,轮询所花费的时间将越来越长,而服务器连接的 socket 大多不是活跃的,所以轮询所花费的大部分时间将是无用的。

    为了解决这个问题, epoll 被创造出来,它的概念和 poll 类似,不过每次轮询时,他只会把有数据活跃的 socket 挑出来轮询,这样在有大量连接时轮询就节省了大量时间。

    对于 epoll 的操作,其实也很简单,只要 4 个 API 就可以完全操作它。

    epoll_create

    用来创建一个 epoll 描述符( 就是创建了一个 epoll )

    epoll_ctl

    对epoll 事件操作,包括以下操作:
    EPOLL_CTL_ADD 添加一个新的epoll事件
    EPOLL_CTL_DEL 删除一个epoll事件
    EPOLL_CTL_MOD 改变一个事件的监听方式

    epoll监听的事件七种,而我们只需要关心其中的三种:
    EPOLLIN 缓冲区满,有数据可读(read)
    EPOLLOUT 缓冲区空,可写数据 (write)
    EPOLLERR 发生错误 (error)

    epoll_wait

    就是让 epoll 开始工作,里面有个参数 timeout,当设置为非 0 正整数时,会监听(阻塞) timeout 秒;设置为 0 时立即返回,设置为 -1 时一直监听。
    在监听时有数据活跃的连接时其返回活跃的文件句柄列表(此处为 socket 文件句柄)。

    close

    关闭 epoll

    IO复用详解可以参考我另外一篇文章: IO复用模型同步,异步,阻塞,非阻塞及实例详解

    IOLoop模块

    让我们通过查看ioloop.py文件直接进入服务器的核心。这个模块是异步机制的核心。它包含了一系列已经打开的文件描述符(文件指针)和每个描述符的处理器(handlers)。它的功能是选择那些已经准备好读写的文件描述符,然后调用它们各自的处理器(一种IO多路复用的实现,select / epoll)。
    可以通过调用add_handler()方法将一个socket加入IO循环中:

    """为文件描述符注册指定处理器(callback),当文件描述指定的事件发生"""    
    def add_handler(self, fd, handler, events):    
       self._handlers[fd] = handler   
       self._impl.register(fd, events | self.ERROR)
    

    _handlers这个字典类型的变量保存着文件描述符(其实就是socket)到当该文件描述符准备好时需要调用的方法的映射(在Tornado中,该方法被称为处理器)。然后,文件描述符被注册到epoll列表中。Tornado关心三种类型的事件(指发生在文件描述上的事件):READ,WRITE 和 ERROR。正如你所见,ERROR是默认为你自动添加的。
    self._impl是select.epoll()selet.select()两者中的一个
    现在让我们来看看实际的主循环,这段代码被放在了start()方法中:

    def start(self):
        """Starts the I/O loop.
        The loop will run until one of the I/O handlers calls stop(), which
        will make the loop stop after the current event iteration completes.
        """
        self._running = True
        while True: # 开始事件循环 Event Loop 
            [ ... ]
            if not self._running:
                break
            [ ... ]
            try:
                event_pairs = self._impl.poll(poll_timeout)  # 通过epoll/select机制返回有事件返回的(fd: events)的键值对
            except Exception, e:
                if e.args == (4, "Interrupted system call"):
                    logging.warning("Interrupted system call", exc_info=1)
                    continue
                else:
                    raise
            # Pop one fd at a time from the set of pending fds and run
            # its handler. Since that handler may perform actions on
            # other file descriptors, there may be reentrant calls to
            # this IOLoop that update self._events
            self._events.update(event_pairs) # 更新所有准备好的事件列表
            while self._events:
                fd, events = self._events.popitem() # 循环逐个弹出可以待执行的socket和事件
                try:
                    self._handlers[fd](fd, events) # 之前通过add_handler注册的fd和回调函数, 到这里就可以执行相对应的回调函数了
                except KeyboardInterrupt:
                    raise
                except OSError, e:
                    if e[0] == errno.EPIPE:
                        # Happens when the client closes the connection
                        pass
                    else:
                        logging.error("Exception in I/O handler for fd %d",
                                      fd, exc_info=True)
                except:
                    logging.error("Exception in I/O handler for fd %d",
                                  fd, exc_info=True)
    

    这就是异步的核心组件IOLoop 的核心工作,我们来看看它的工作流程:

    • 开始一个事件循环 Event Loop ,用于监测被注册到这里的fd(非阻塞socket), 如果有执行事件发生,就执行相应回调函数
    • event_pairs = self._impl.poll(poll_timeout) 通过epoll/select机制返回有事件返回的(fd: events)的键值对
    • self._events.update(event_pairs) # 更新所有准备好的事件列表
    • while self._events: 循环这个事件列表,循环逐个弹出可以待执行的socket和事件
    • self._handlers[fd](fd, events) 之前通过add_handler注册的fd和回调函数, 到这里就可以执行相对应的回调函数了

    通过上面介绍的add_handler注册socket->callback,这个start()功能就是tornado开启的一个单线程事件IO循环,用于监测所有非阻塞socket的事件,只要被注册的socket事件发生了,就执行注册时的回调函数。
    具体到实际就是可以分为这两种情况:

    • 监听连接: 一开始创建的一个服务器端socket监听端口,等待客户端连接,这时通过setblocking(0)设置这个socket 非阻塞,然后add_handler(socket, handler_connection, READ) 注册这个socket的可读事件,只有要新连接过来,就会触发读事件,handler_connection这个回调函数就执行。
    • 请求其他数据时: 比如http_client.fetch()的connected,recvfrom的socket都会设置成nonblocking非阻塞,同时add_hanndler注册,等待事件发生,并调用回调函数。

    例子:一个建议的服务器监听:

    def connection_ready(sock, fd, events):
        while True:
            try:
                connection, address = sock.accept()
            except socket.error as e:
                if e.args[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
                    raise
                return
            connection.setblocking(0)
            handle_connection(connection, address)
    
    if __name__ == '__main__':
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.setblocking(0) # 把监听的socket设置成非阻塞
        sock.bind(("", port))
        sock.listen(128)
    
        io_loop = tornado.ioloop.IOLoop.current()
        callback = functools.partial(connection_ready, sock)
        io_loop.add_handler(sock.fileno(), callback, io_loop.READ) # 注册这个服务器端监听socket可读事件,同时注册这个回调函数
        io_loop.start()
    
    tornado 异步原理.jpg

    效率对比实例代码

    
    """异步抓取网页"""
    class AsyncHandler(RequestHandler):
        @asynchronous
        def get(self):
            http_client = AsyncHTTPClient()
            http_client.fetch("http://www.163.com",
                              callback=self.on_fetch)
    
        def on_fetch(self, response):
            print response
            self.write('done')
            self.finish()
    
    
    """同步抓取网页"""
    class SyncHandler(RequestHandler):
        def get(self):
            http_client = HTTPClient()
            response = http_client.fetch("http://www.163.com")
            print response
            self.write('done')
    

    进行压测测试

    # 异步代码压测结果
    Document Path:          /async_fetch/
    Document Length:        4 bytes
    
    Concurrency Level:      5
    Time taken for tests:   1.945 seconds
    Complete requests:      50
    Requests per second:    25.71 [#/sec] (mean)
    Time per request:       194.488 [ms] (mean)
    Time per request:       38.898 [ms] (mean, across all concurrent requests)
    
    # 同步代码压测结果
    Document Path:          /sync_fetch/
    Concurrency Level:      5
    Time taken for tests:   5.423 seconds
    Complete requests:      50
    Requests per second:    9.22 [#/sec] (mean)
    Time per request:       542.251 [ms] (mean)
    Time per request:       108.450 [ms] (mean, across all concurrent requests)
    

    可以看出异步比同步的性能高很多

    总结

    • Tornado的异步条件:要使用到异步,就必须把IO操作变成非阻塞的IO。
    • Tornado的异步原理: 单线程的torndo打开一个IO事件循环, 当碰到IO请求(新链接进来 或者 调用api获取数据),由于这些IO请求都是非阻塞的IO,都会把这些非阻塞的IO socket 扔到一个socket管理器,所以,这里单线程的CPU只要发起一个网络IO请求,就不用挂起线程等待IO结果,这个单线程的事件继续循环,接受其他请求或者IO操作,如此循环。

    参考

    http://www.cnblogs.com/yiwenshengmei/archive/2011/06/08/understanding_tornado.html
    https://github.com/tornadoweb/tornado/blob/master/tornado/ioloop.py#L928
    http://blog.csdn.net/wyx819/article/details/45420017
    https://www.rapospectre.com/blog/34
    http://golubenco.org/understanding-the-code-inside-tornado-the-asynchronous-web-server-powering-friendfeed.html
    https://www.futures.moe/writings/introduction-for-tornado-async-programming.htm



    作者:cooffeelis
    链接:https://www.jianshu.com/p/de7f04e65618
    來源:简书
    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
    展开全文
  • 来到这里, 就有点像tornado 异步协程的模型了: producer()类似IOLoop一直在循环,由它来产生事件,再跳出来consumer(),当让可以有N个consumer(),让他们处理。 producer()就是一个调度器,可以控制事件扔给哪个...

    协程定义:

    协程,又称微线程,纤程。英文名Coroutine。

    子程序,或者称为函数,在所有语言中都是层级调用,比如A调用B,B在执行过程中又调用了C,C执行完毕返回,B执行完毕返回,最后是A执行完毕。
    所以子程序调用是通过栈实现的,一个线程就是执行一个子程序。
    子程序调用总是一个入口,一次返回,调用顺序是明确的。而协程的调用和子程序不同。
    协程看上去也是子程序,但执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。
    注意,在一个子程序中中断,去执行其他子程序,不是函数调用,有点类似CPU的中断。

    那和多线程比,协程有何优势?

    最大的优势就是协程极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。

    第二大优势就是不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。

    因为协程是一个线程执行,那怎么利用多核CPU呢?最简单的方法是多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。

    Python对协程的支持还非常有限,用在generator中的yield可以一定程度上实现协程。虽然支持不完全,但已经可以发挥相当大的威力了。

    来看例子:
    传统的生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就可能死锁。
    如果改用协程,生产者生产消息后,直接通过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高:

     

    # yield 的生产者消费者模型
    
    def consumer():
        r = 'i am start now!!!'
        while True:
            i = yield r
            print 'consuming task %s' % i
            r = '200 Done'
    
    
    def producer(c):
        start_up = c.next() # 或者c.send(None) 启动生成器, 遇到yield 返回 重新来到这里
        print 'start_up is %s' % start_up
        n = 5
        i = 0
        while i < n:
            i+=1
            print 'producing task is %s' % i
            res = c.send(i) # 生产了一个任务,通过 send(i) 把函数执行权切换到consumer,消费者接收任务处理, 此时consumer 的yield r 表达式等于send()的参数,即i=i
                            # 而send(i) 的返回值就由consumer的yield r产生,yield r 可以相当于return r 所以,res=“200 Done”
            print 'consumer done ,res: %s' % res
    
        c.close() # 不生产任务了,就关闭生成器
    
    
    c = consumer()
    producer(c)
    

    上面的协程运行流程是:

    1. c = consumer() 创建一个生成器,注意并不是执行一个函数,这里只会生成一个生成器对象,没有执行里面的任何代码, 要启动生成器,需要c.next() 或者c.send(None)来启动。
    2. produce(c) 把c生成器传进去producer,然后start_up=c.next() 这里就是启动了consumer()生成器,
      启动的意思是,开始运行生成器,直到遇到yield , 就会把yield后面的内容返回,并且回到原来的地方,
      这里遇到了yield r, 相当于把r变量返回(想象成return r), 并且回到执行c.next()的函数(producer)来,继续执行producer的代码

      所以start_up = c.next() 做了两件事:
      • 进去了另外一个函数consumer()执行, 直到遇到yield
      • 遇到 yield r, 然后就回到producer里面了,顺便把r的值给到start_up, 这里是start_up="i am start now!!!"
      • 保留现场,这次停留在哪个yield, 下次回来就会在这个yield继续
    3. 上面启动了consumer生成器后回到来,producer()继续跑下面的代码, 遇到res=c.send(1),
      这里的c.send() 相当于 c.next(),也是重新回到生成器里面执行 ,只不过send()可以带参数,把参数带过去生成器。
      所以 res = c.send(1), 做了几件事:
      • 重新进入consumer()生成器,回到上一次跳出来的yield位置, 也就是yield r
      • 并且不同于直接c.next(), c.send(1)带了一个参数1, 而 consumer 里面的i =yield r, 那这次回来就会让yield r 这个表达式赋值成参数 1, 也就是i=yield r 变成了 i = 1, 这就是send(1) 的作用,传递参数
      • i = 1后,继续再生成器consumser 里面执行代码, print 'consuming task %s' % i 就会输出 consuming task 1
      • 让 r = '200 Done', 在循环来到 yield r , 相当于yield '200 Done', 返回200 Done ,并且跳出生成器,回到producer()继续执行。

    生产者消费者模式就这样,通过协程交替循环工作。如果不用协程的话,一个线程,要么做生产者,要么做消费者,不能让他们切换工作,只能使用多线程,分别运行生产者,消费者。
    当然,虽然协程可以切换运行,但毕竟它只有一个线程,只能在代码之前来回切换运行,不能有并行运行。

    总结:

    来到这里, 就有点像tornado 异步协程的模型了: producer()类似IOLoop一直在循环,由它来产生事件,再跳出来consumer(),当让可以有N个consumer(),让他们处理。
    producer()就是一个调度器,可以控制事件扔给哪个协程去处理, 因为协程可以随时切回来顶级调度器。比如我们可以设定当i是偶数给consumer1()处理, 奇数给consumer2()处理,都是可以的,让producer()作为一个顶级调度器

    tornado 协程

    从上面可以看到,Generator已经具备协程的一些能力。如:能够暂停执行,保存状态;能够恢复执行;能够异步执行。

    但是此时Generator还不是一个协程。一个真正的协程能够控制代码什么时候继续执行。而一个Generator执行遇到一个yield表达式 或者语句,会将执行控制权转移给调用者。

    在维基百科中提到,可以实现一个顶级的调度子例程,将执行控制权转移回Generator,从而让它继续执行。在tornado中,ioLoop就是这样的顶级调度子例程,每个协程模块通过,函数装饰器coroutine和ioLoop进行通信,从而ioLoop可以在协程模块执行暂停后,在合适的时机重新调度协程模块执行。

    不过,接下来还不能介绍coroutine和ioLoop,在介绍这两者之前,先得明白tornado中在协程环境中一个非常重要的类Future.

    类比

    就好比上面的producer()作为一个顶级生产者,调度器,可以分配任务给任何消费者生成器,适当时候在执行暂停后,在合适的时机重新调度协程模块执行。

    Future类

    Future封装了异步操作的结果。实际是它类似于在网页html前端中,图片异步加载的占位符,但加载后最终也是一个完整的图片。Future也是同样用处,tornado使用它,最终希望它被set_result,并且调用一些回调函数。Future对象实际是coroutine函数装饰器和IOLoop的沟通使者,有着非常重要的作用。
    参考 tornado协程(coroutine)原理

    异步非阻塞例子

     

    class GenHandler(tornado.web.RequestHandler):
        @gen.coroutine
        def get(self):
            url = 'http://www.baidu.com'
            http_client = AsyncHTTPClient()
            response = yield http_client.fetch(url)
            s = yield gen.sleep(5) # 该阻塞还是得阻塞, 异步只是对其他链接而言的
            self.write(response.body)
    
    class MainHandler(tornado.web.RequestHandler):
       ...
    
    if __name__ == "__main__":
        application = tornado.web.Application([
            (r"/", MainHandler),
            (r"/gen_async/", GenHandler),
        ], autoreload=True)
        application.listen(8889)
        tornado.ioloop.IOLoop.current().start()
    
    

    上面的GenHandler便是使用了协程异步的例子, 当我们请求/gen_async/时,这个请求会请求百度网页和sleep 5秒,当然这个gen.sleep()是非阻塞的。但是我们请求这个url还是会停止5秒后才完成响应。
    然而 在这个5秒等待中,我们请求 / 还是tornado还是能直接给我们响应,而不是要等5秒过后才能响应!
    这里要强调的是:这里的异步非阻塞是针对另一请求来说的,本次的请求该是阻塞的仍然是阻塞的。

    那么我们来分析一下tornado是如何在单线程情况下,一个请求被阻塞,另外的请求还可以处理响应,实现异步的。
    首先我们先分析比较简单的yield gen.sleep(5):

    1. 查看@gen.coroutine这个装饰器源码,看他工作原理

       

    • 首先result=func(*args, **kwargs) 相当于获得这个生成器对象, 也就是def get()这个生成器,但是也只是返回生成器而已,没有执行里面任何代码,需要next()或者send()才会启动生成器
    • 来到 yielded = next(result) 这里就是启动了生成器,来到get() 里面的yield ,然后返回yield后面的内容,也就是gen.sleep(5) , 返回这个函数,也就是执行这个函数gen.sleep() 然后获取gen.sleep()里面的返回值, 交给yield, 所以假如gen.sleep(5) 函数最后执行的结果是return 5 ,那执行完以后就yield gen.sleep() 相当于yield 5 所以yield gen.sleep(5)是yield 这个表达式的返回值,跟yield 一个定值是一样的,只不过要执行完gen.sleep() 才会得到这个定值!
      所以当yield gen.sleep()的时候,就是进去执行了这个异步函数gen.sleep()。所以我们要进去看这个gen.sleep()究竟做了什么, 我们进去看看代码:

     

    def sleep(duration):
        f = Future()
        IOLoop.current().call_later(duration, lambda: f.set_result(None))
        return f
    
    • 首先可以确定的是,这个gen.sleep()返回值是一个future()对象。那么上面的yielded = next(result),实际上被赋值的就是这个future()对象
    • 其次,给IOLOOP循环通过add_timeout() 添加了一个callback, 这个add_timeout()本质上就是add_callback, 只是指定多少秒后执行这个callback, 所以这一步最核心的功能就是给ioloop添加一个5秒后执行的callback, 而这个callback就是匿名函数, 执行f.set_result(None), 让future.set_result(),完成这个future的填充。ioloop会在每次循环检查执行这些callback,由于 ioloop添加的callback设定了时间,所以在5秒后的循环会执行这个函数。并不是下面说的,ioloop在5秒后再添加callback,而是立即添加了callback,设定了5秒后执行
      注意: ioloop不会判断是否该执行callback, 它只会在每次循环中都执行callback, 所以是否执行callback,应该交给future来决定,所以就有了future.add_done_callback( lambda future: self.add_callback(callback, future)) 这样的函数,意思就是当future 完成后给ioloop添加callback。ioloop只管执行callback, future管啥时候添加callback给ioloop。

    Coroutine和IoLoop是如何切换的(之前一直想不明白)

     

    class GenAsyncHandler(RequestHandler):
        @gen.coroutine
        def get(self):
            http_client = AsyncHTTPClient()
            response = yield http_client.fetch("http://example.com")
            do_something_with_response(response)
            self.render("template.html")
    
    • 这里有三个函数概念:
      1. 装饰器函数 a() 对应上面的@gen.coroutine()
      2. 被包裹的函数 b() 对应上面的get()
      3. 被包裹函数里面的异步执行函数 c() 对应上面的 http_client.fetch(url)
    • Coroutine和IoLoop是如何切换的(之前一直想不明白) 先看看@gen.coroutine是干嘛的
      参考上面截图,他就是一个装饰器,装饰器是一个返回函数的高阶函数,所以被这个gen.coroutine装饰器包裹的函数b()都是相当于返回了另外一个函数a(),而b()只是在a()函数的一部分:

     

    """返回生成器, 还没启动生成器"""
    result = func(*args, kwargs) 
    """启动生成器, 如果func()里面是 yield gen.sleep(4) 或者 yield async.fetch(url)
    那么yielded 就被赋值成他们的返回值,而他们一般是返回future对象
    这里实际上就跳进去执行了gen.sleep()或者async.fetch()"""
    yielded = next(result) 
    """将这个yielded 放进去runner,runner的作用实际上就是进去注册一个当future完成的callback, 就是run() 
    run()就是执行gen.send()的地方"""
    Runner(result, future, yielded)
    
    • 所以当执行到yielded = next(result)时,是进去到b()函数的yield地方,进去yield后面的函数执行, 也就是异步函数c() ,(当然这个c()函数只是给ioloop添加一个callback,并不是阻塞同步执行)并且停止跳出来a(),然后继续a()函数其他代码,当yield 的时候,这时运行完c(), 已经从b()跳出来到a()函数了(也就是回到主线程IOloop的循环, 只要不是进去子协程,都是主循环)。

    所以这个高阶装饰器比较难理解, 这个装饰器实际上是另外一个函数a(), 包含了被包裹的原来函数b(), 而在这个高阶函数里面,执行原来的函数b(),也就是生成器,都是在装饰器这个新的函数a()里面操作的。直到遇到func()也就是执行b(), next启动生成器b()。
    遇到yield,执行yield 的 c(),获得c()的返回值。这样就取出c()的返回值,
    出来到高阶函数,也就是从原函数b()停止了。
    yielded 取出了异步函数c()的返回值, 比如yield fetch() yield gen.sleep() 的返回值,也就是future占位符,随后拿着这个占位符,去runner()那里注册: 当future完成后,执行run()函数, run()的作用就是gen.send(),可以重新回到原函数!
    问:那么,什么时候是future被完成呢?也就是什么时候被future.set_result()。是我们需要知道的。
    答:当我们yield c()的时候,不就是进去这个c()函数执行吗, 就是这个时候,在c()里面,注册了一个函数给ioloop,让它下次循环执行,执行完自然就会set_result()拉,然后就会再下次循环中知道对应run()执行,发送gen.send(value)这个结果给原函数!

    最后在看一个例子:

     

    import tornado.ioloop
    from tornado.gen import coroutine
    from tornado.concurrent import Future
    @coroutine
    def get_web():                                                                             
         #http_client = AsyncHTTPClient()
         http_client = HTTPClient()
         response = yield http_client.fetch("http://example.com")
         print 'status_code is %s' % response.code
    
    @coroutine
    def asyn_sum(a, b):
        print("begin calculate:sum %d+%d"%(a,b))
        future = Future()
    
        def callback(a, b):
            print("calculating the sum of %d+%d:"%(a,b))
            future.set_result(a+b)
        tornado.ioloop.IOLoop.instance().add_callback(callback, a, b)
    
        result = yield future
    
        print("after yielded")
        print("the %d+%d=%d"%(a, b, result))
    
    def main():
        asyn_sum(2,3)
        print 'haha'
        tornado.ioloop.IOLoop.instance().start()
    
    if __name__ == "__main__":
        main()
    

    解析:

    • 定义一个callback添加到ioloop里面,这个callback有future.set_result()的功能,相当于上面说的 yield c(), 当执行c()会把这个函数添加到ioloop,然后让它执行,最后set_result()。 这里只是手动添加而已了。
    • 然后定义了一个future , 遇到yield future的时候,进入gen.coroutine装饰器高阶函数,还是按顺序执行

     

    result = asyn_sum(2,3)
    yielded = next(result)#  把asyn_sum里面定义的future对象拿过来了
    future = Future() # 在新建一个新的future 跟上面yielded 这个future是不一样的
    Runner(result, future, yielded) # 把yielded 进去runner注册, yielded完成后执行runner(),返回到asyn_sum(), 
    
    • 所以再高阶函数执行完后,asyn_sum(2,3)返回的是新创建的future对象,并且从async_sum() yield那里开始出来。所以这时print 'haha' 然后再print after yielded
    • run()函数执行的内容我们看看源码:

     

    while True:
        try:
            value = future.result()
            yielded = self.gen.send(value)
             ...
        except (StopIteration, Return) as e:
            if self.pending_callbacks and not self.had_exception:
                ...
                self.result_future.set_result(_value_from_stopiteration(e))
                self.result_future = None
                return
    ...
    

    Tornado异步编程

    @gen.coroutine
    并不是所有函数加了个装饰gen.coroutine就会变成异步,比如上面的例子get_web()如果里面的httpclient使用了同步库HTTPClient(), 再yield http_client.fetch()会报错

     

    raise BadYieldError("yielded unknown object %r" % (yielded,))
    

    因为这个库本身是同步的,你yield 的时候,直接执行了这个同步函数,然后返回的是response,给到这个装饰器的时候,handle_yielded 检测不到是一个可转换的future就会报错,所以要用上异步功能,必须使用符合tornado异步库要求的库,它们会把执行的函数添加到ioloop并且返回future,并在完成的时候future.set_result()

    @asynchronous
    tornado在使用gen.coroutine协程做异步编程之前是用@asynchronous这个装饰器来异步编程的。

     

    class AsyncHandler(RequestHandler):
        @asynchronous
        def get(self):
            http_client = AsyncHTTPClient()
            http_client.fetch("http://example.com",
                              callback=self.on_fetch)
    
        def on_fetch(self, response):
            do_something_with_response(response)
            self.render("template.html")
    
    • 它的原理就是当异步调用 http_client.fetch() 时,进去执行,还是跟上面一样,没有同步执行,等待返回,而是创建一个future(), 然后把这个callback on_fetch() 注册到ioloop里,等这个future()被set_result()了就会执行on_fetch()

    • 而真正要做的request = fetch()也要下达命令执行,由于这个是非阻塞的,由epollIO复用机制通知是否完成, 所以是发起这个请求通过add_handler(fd, callback)来加进ioloop循环,然后ioloop等待epoll的通知,fd有数据可读了,说明请求完成, 就可以执行fetch()的callback,通过源码知道,这个callback是:给上面创建的future set_result。
      意思就是当请求完了,EPOLL通知ioloop, ioloop执行这个fd的callback,也就是设置这个请求完成了,future.set_result()

     

     def handle_response(response):
                if raise_error and response.error:
                    future.set_exception(response.error)
                else:
                    future.set_result(response)
    
    • 一旦这个future.set_result()了,就会执行第一步给ioloop添加的callback, 也就是我们在代码上写的on_fetch()函数。 这个过程行云流水。

    所以无论是用callback()方式, 还是gen.corountin来实现异步,他们的核心都是一样,通过add_callback,和非阻塞任务add_handler()来注册任务或者非阻塞socket。

    参考

    tornado协程(coroutine)原理
    真正的 Tornado 异步非阻塞
    浅析tornado协程运行原理
    廖雪峰的官方网站-协程
    廖雪峰的官方网站-装饰器
    现代魔法学院

     

    作者:大富帅
    链接:https://www.jianshu.com/p/d63a0ab93805
    来源:简书
    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

     

    展开全文
  • 主要介绍了简单介绍Python的Tornado框架中的协程异步实现原理,作者基于Python的生成器讲述了Tornado异步的特点,需要的朋友可以参考下
  • 主要介绍了详细解读tornado协程(coroutine)原理,涉及协程定义,生成器和yield语义,Future对象,ioloop对象,函数装饰器coroutine等相关内容,具有一定借鉴价值,需要的朋友可以参考下
  • Tornado异步原理分析

    千次阅读 2012-01-07 14:23:30
    猛击这里
  • Python Web框架 tornado 异步原理 参考:http://www.jb51.net/article/64747.htm 待整理 转载于:https://www.cnblogs.com/hellojesson/p/6420243.html
  • 第2节 tornado是如何实现做到高并发的原理 - tornado异步编程详解 第3节 tornado容易产生的几个误解 第4节 tornado中async和await及coroutine装饰器的介绍 第5节 通过socket的阻塞和非阻塞演示来解说 tornado协程...
  • tornado异步非阻塞原理

    2019-08-04 17:21:00
    1.tornado框架核心代码分析(Snow类注释) 1.每个请求过来就会创建一个socket对象,并调用select去监听连接,select会将所有请求放到readable_list列表中 2.使用while不断执行for循环遍历readable_list,如果是...
  • Python web框架 Tornado(二)异步非阻塞 异步非阻塞 阻塞式:(适用于所有框架,Django,Flask,Tornado,Bottle)  一个请求到来未处理完成,后续一直等待  解决方案:多线程,多进程 异步非阻塞(存在IO请求): ...
  • tornado实现异步非阻塞

    2020-11-04 09:04:30
    1、使用 tornado.gen.coroutine 异步编程(需要第三方库支持tornado异步) 同步阻塞 code # coding=utf-8 # @Time : 2020/11/3 15:40 # @Author : Leo # @Email : l1512102448@qq.com # @File : demo_tornado_asy.py...
  • 相信大家学习python,有听过flask、django、tornado 这些web框架。那么这些漂亮的框架的应用场景是什么呢。 要性能,Tornado 首选;要开发速度,Django 和Flask 都行, 区别是Flask 把许多功能交给第三方库去完成了...
  • 真正的 Tornado 异步非阻塞

    万次阅读 2017-05-31 16:46:35
    其中 Tornado 的定义是 Web 框架和异步网络库,其中他具备有异步非阻塞能力,能解决他两个框架请求阻塞的问题,在需要并发能力时候就应该使用 Tornado。但是在实际使用过程中很容易把 Tornado 使用成异步阻塞框架,...
  • Tornado异步阻塞解决方案
  • 目录 7.1 认识异步 1. 同步 ...7.2 Tornado异步 1. tornado.httpclient.AsyncHTTPClient fetch(request, callback=None) HTTPRequest HTTPResponse 2. 测试接口 3. 回调异步 tornado.we...
  • Tornado异步非阻塞

    2018-11-24 07:37:34
    随便问一个Python开发者,Tornado框架和Django/Flask之类的框架有什么区别,十有八九会回答,Tornado支持异步非阻塞。Tornado和其他两个框架最大的区别在于,Tornado既是web框架同时又是异步网络库,也是web服务器。...
  • 第2节 tornado是如何实现做到高并发的原理 - tornado异步编程详解 第3节 tornado容易产生的几个误解 第4节 tornado中async和await及coroutine装饰器的介绍 第5节 通过socket的阻塞和非阻塞演示来解说 tornado协程的...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 2,290
精华内容 916
关键字:

tornado异步原理