app的环境 能开发react
2013-09-19 09:10:57 u011676589 阅读数 1346



2014-11-05 12:19:27 whereismatrix 阅读数 739

Reactor and Proactor

[原文:http://blog.csdn.net/wenbingoon/article/details/9880365]

     1 概述

IO读写时,多路复用机制都会依赖对一个事件多路分离器,负责把源事件的IO 事件分离出来,分别到相应的read/write事件分离器。涉及到事件分离器的两种模式分别就是 Reactor和Proactor,Reactor是基于同步IO的,Proactor是基于异步IO的。

在Reactor模式中,事件分离者等待某个事件或者可应用或个操作的状态发生(比如文件描述符可读写,或者是socket可读写),事件分离者就把这个事件传给事先注册的事件处理函数或者回调函数,由后者来做实际的读写操作。Reactor模式主要是提高系统的吞吐量,理解反应器模式的例子Reactor模式,或者叫反应器模式

在Proactor模式中,事件处理者(或者代由事件分离者发起)直接发起一个异步读写操作(相当于请求),而实际的工作是由操作系统来完成的。发起时,需要提供的参数包括用于存放读到数据的缓存区,读的数据大小,或者用于存放外发数据的缓存区,以及这个请求完后的回调函数等信息。事件分离者得知了这个请求,它默默等待这个请求的完成,然后转发完成事件给相应的事件处理者或者回调。举例来说,在Windows上事件处理者投递了一个异步IO操作(称有 overlapped的技术),事件分离者等IOCompletion事件完成. 这种异步模式的典型实现是基于操作系统底层异步API的,所以我们可称之为“系统级别”的或者“真正意义上”的异步,因为具体的读写是由操作系统代劳的。

举个例子,将有助于理解Reactor与Proactor二者的差异,以读操作为例(类操作类似)。

在Reactor中实现读

- 注册读就绪事件和相应的事件处理器

- 事件分离器等待事件

- 事件到来,激活分离器,分离器调用事件对应的处理器。

- 事件处理器完成实际的读操作,处理读到的数据,注册新的事件,然后返还控制权。

与如下Proactor(真异步)中的读过程比较:

- 处理器发起异步读操作(注意:操作系统必须支持异步IO)。在这种情况下,处理器无视IO就绪事件,它关注的是完成事件。

- 事件分离器等待操作完成事件

- 在分离器等待过程中,操作系统利用并行的内核线程执行实际的读操作,并将结果数据存入用户自定义缓冲区,最后通知事件分离器读操作完成。

- 事件分离器呼唤处理器。

- 事件处理器处理用户自定义缓冲区中的数据,然后启动一个新的异步操作,并将控制权返回事件分离器。

可以看出,两个模式的相同点,都是对某个IO事件的事件通知(即告诉某个模块,这个IO操作可以进行或已经完成)。在结构上,两者也有相同点:demultiplexor负责提交IO操作(异步)、查询设备是否可操作(同步),然后当条件满足时,就回调handler;

  不同点在于,异步情况下(Proactor),当回调handler时,表示IO操作已经完成;同步情况下(Reactor),回调handler时,表示IO设备可以进行某个操作(can read or can write),handler这个时候开始提交操作。

2、Reactor模式

     Reactor释义“反应堆”,是一种事件驱动机制。和普通函数调用的不同之处在于:应用程序不是主动的调用某个API完成处理,而是恰恰相反,Reactor逆置了事件处理流程,应用程序需要提供相应的接口并注册到Reactor上,如果相应的时间发生,Reactor将主动调用应用程序注册的接口,这些接口又称为“回调函数”。使用Libevent也是想Libevent框架注册相应的事件和回调函数;当这些时间发声时,Libevent会调用这些回调函数处理相应的事件(I/O读写、定时和信号)。
    用“好莱坞原则”来形容Reactor再合适不过了:不要打电话给我们,我们会打电话通知你。
 


3、两个模式简单对比

     两个模式的相同点:(1)都是对某个IO事件的事件通知(即告诉某个模块,这个IO操作可以进行或已经完成)。(2)在结构上的相同点:demultiplexor负责提交IO操作(异步)、查询设备是否可操作(同步),然后当条件满足时,就回调handler。

     不同点在于:异步情况下(Proactor),当回调handler时,表示IO操作已经完成;同步情况下(Reactor),回调handler时,表示IO设备可以进行某个操作(can read or can write),handler这个时候开始提交操作。

     我的理解:两者的根本区别就在于《Unix网络编程第一卷:套接口API》第6章讲解的五种I/O模型,Proactor是基于异步I/O,Reactor是同步I/O(一般是I/O复用)。但是现在的操作系统并不是都能很好的真正支持异步I/O,比如Windows里有真正的异步I/O——IOCP,而Unix、Linux并没有真正实现异步I/O。所以考虑程序移植性以及现在很多服务器基于Unix,Linux;Proactor封装了这种差异,在内部异步事件分离器实现时根据系统的不同调用相应的I/O模式。



二、BIO、NIO、AIO

 NIO通常采用Reactor模式,AIO通常采用Proactor模式。AIO简化了程序的编写,stream的读取和写入都有OS来完成,不需要像NIO那样子遍历Selector。Windows基于IOCP实现AIO,Linux只有eppoll模拟实现了AIO。

Java7之前的JDK只支持NIO和BIO,从7开始支持AIO。

4种通信方式:TCP/IP+BIO, TCP/IP+NIO, UDP/IP+BIO, UDP/IP+NIO。

TCP/IP+BIO、

Socket和ServerSocket实现,ServerSocket实现Server端端口监听,Socket用于建立网络IO连接。

不适用于处理多个请求 1.生成Socket会消耗过多的本地资源。2. Socket连接的建立一般比较慢。

BIO情况下,能支持的连接数有限,一般都采取accept获取Socket以后采用一个thread来处理,one connection one thread。无论连接是否有真正数据请求,都需要独占一个thread。

可以通过设立Socket池来一定程度上解决问题,但是使用池需要注意的问题是:1. 竞争等待比较多。 2. 需要控制好超时时间。

TCP/IP+NIO

使用Channel(SocketChannel和ServerSocketChannel)和Selector。

Server端通常由一个thread来监听connect事件,另外多个thread来监听读写事件。这样做的好处是这些连接只有在真是请求的时候才会创建thread来处理,one request one thread。这种方式在server端需要支持大量连接但这些连接同时发送请求的峰值不会很多的时候十分有效。

UDP/IP+BIO

DatagramSocket和DatagramPacket。DatagramSocket负责监听端口以及读写数据,DatagramPacket作为数据流对象进行传输。

UDP/IP是无连接的,无法进行双向通信,除非双方都成为UDP Server。

UDP/IP+NIO

通过DatagramChannel和ByteBuffer实现。DatagramChannel负责端口监听及读写。ByteBuffer负责数据流传输。

如果要将消息发送到多台机器,如果为每个目标机器都建立一个连接的话,会有很大的网络流量压力。这时候可以使用基于UDP/IP的Multicast协议传输,Java中可以通过MulticastSocket和DatagramPacket来实现。

Multicast一般多用于多台机器的状态同步,比如JGroups。SRM, URGCP都是Multicast的实现方式。eBay就采用SRM来实现将数据从主数据库同步到各个搜索节点机器。


------------------

就IO而言:概念上有5中模型:blocking I/O,nonblocking I/O,I/O multiplexing (select and poll),signal driven I/O (SIGIO),asynchronous I/O (the POSIX aio_functions)。

不同的操作系统对上述模型支持不同: unix支持io多路复用,不同系统叫法不同 :freebsd里面叫 kqueue;linux 是epoll。而windows: 2000的时候就诞生了IOCP支持最后一种异步I/O
java是一种跨平台语言,为了支持异步IO,诞生了nio,Java1.4引入的NIO 1.0是基于I/O复用的。在各个平台上会选择不同的复用方式。Linux用的epoll,BSD上用kqueue,Windows上应该是重叠I/O(肯定不是IOCP)。
 
NIO 2.0(Java1.7)里终于有AIO了,Linux上用AIO,Windows上用IOCP,都支持了概念上的最后一种IO -- asynchronous I/O 

-------------------

[原文: http://blog.sina.com.cn/s/blog_9a97a37c0101aahl.html]

Reactor and Proactor两个模式的相同点,都是对某个IO事件的事件通知(即告诉某个模块,这个IO操作可以进行或已经完成)。在结构上,两者也有相同点:demultiplexor负责提交IO操作(异步)、查询设备是否可操作(同步),然后当条件满足时,就回调handler。

不同点在于,异步情况下(Proactor),当回调handler时,表示IO操作已经完成;同步情况下(Reactor),回调handler时,表示IO设备可以进行某个操作(can read or can write),handler这个时候开始提交操作。

用select模型写个简单的reactor,大致为:

Proactor和Reactor模式///
Proactor和Reactor模式class handler
Proactor和Reactor模式
{
Proactor和Reactor模式
public:
Proactor和Reactor模式    
virtual void onRead() = 0;
Proactor和Reactor模式    
virtual void onWrite() = 0;
Proactor和Reactor模式    
virtual void onAccept() = 0;
Proactor和Reactor模式}

Proactor和Reactor模式
Proactor和Reactor模式
class dispatch
Proactor和Reactor模式
{
Proactor和Reactor模式
public:
Proactor和Reactor模式    
void poll()
Proactor和Reactor模式    
{
Proactor和Reactor模式        
// add fd in the set.
Proactor和Reactor模式        
//
Proactor和Reactor模式        
// poll every fd
Proactor和Reactor模式
        int = select( 0&read_fd, &write_fd, 00 );
Proactor和Reactor模式        
if> 0 )
Proactor和Reactor模式        
{
Proactor和Reactor模式            
for each fd in the read_fd_set
Proactor和Reactor模式            
   if fd can read
Proactor和Reactor模式                    _handler
->onRead();
Proactor和Reactor模式                
if fd can accept
Proactor和Reactor模式                    _handler
->onAccept();
Proactor和Reactor模式            }
 
Proactor和Reactor模式
Proactor和Reactor模式            
for each fd in the write_fd_set
Proactor和Reactor模式            
{
Proactor和Reactor模式                
if fd can write
Proactor和Reactor模式                    _handler
->onWrite();
Proactor和Reactor模式            }

Proactor和Reactor模式        }

Proactor和Reactor模式    }
 
Proactor和Reactor模式
Proactor和Reactor模式    
void setHandler( handler *_h )
Proactor和Reactor模式    
{
Proactor和Reactor模式        _handler 
= _h;
Proactor和Reactor模式    }
 
Proactor和Reactor模式
Proactor和Reactor模式
private:
Proactor和Reactor模式    handler 
*_handler;
Proactor和Reactor模式}

Proactor和Reactor模式
Proactor和Reactor模式
/// application
Proactor和Reactor模式class MyHandler public handler
Proactor和Reactor模式
{
Proactor和Reactor模式
public:
Proactor和Reactor模式    
void onRead()
Proactor和Reactor模式    
{
Proactor和Reactor模式    }
 
Proactor和Reactor模式
Proactor和Reactor模式    
void onWrite()
Proactor和Reactor模式    
{
Proactor和Reactor模式    }
 
Proactor和Reactor模式
Proactor和Reactor模式    
void onAccept()
Proactor和Reactor模式    
{
Proactor和Reactor模式    }

Proactor和Reactor模式}

Proactor和Reactor模式
Proactor和Reactor模式


在网上找了份Proactor模式比较正式的文档,其给出了一个总体的UML类图,比较全面:

proactor_uml

根据这份图我随便写了个例子代码:

Proactor和Reactor模式class AsyIOProcessor
Proactor和Reactor模式
{
Proactor和Reactor模式
public:
Proactor和Reactor模式    
void do_read()
Proactor和Reactor模式    
{
Proactor和Reactor模式        
//send read operation to OS
Proactor和Reactor模式        
// read io finished.and dispatch notification
Proactor和Reactor模式
        _proactor->dispatch_read();
Proactor和Reactor模式    }
 
Proactor和Reactor模式
Proactor和Reactor模式
private:
Proactor和Reactor模式    Proactor 
*_proactor;
Proactor和Reactor模式}

Proactor和Reactor模式
Proactor和Reactor模式
class Proactor
Proactor和Reactor模式
{
Proactor和Reactor模式
public:
Proactor和Reactor模式    
void dispatch_read()
Proactor和Reactor模式    
{
Proactor和Reactor模式        _handlerMgr
->onRead();
Proactor和Reactor模式    }
 
Proactor和Reactor模式
Proactor和Reactor模式
private:
Proactor和Reactor模式    HandlerManager 
*_handlerMgr;
Proactor和Reactor模式}

Proactor和Reactor模式
Proactor和Reactor模式
class HandlerManager
Proactor和Reactor模式
{
Proactor和Reactor模式
public:
Proactor和Reactor模式    typedef std::list
<Handler*> HandlerList; 
Proactor和Reactor模式
Proactor和Reactor模式
public:
Proactor和Reactor模式    
void onRead()
Proactor和Reactor模式    
{
Proactor和Reactor模式        
// notify all the handlers.
Proactor和Reactor模式
        std::for_each( _handlers.begin(), _handlers.end(), onRead );
Proactor和Reactor模式    }
 
Proactor和Reactor模式
Proactor和Reactor模式
private:
Proactor和Reactor模式    HandlerList 
*_handlers;
Proactor和Reactor模式}

Proactor和Reactor模式
Proactor和Reactor模式
class Handler
Proactor和Reactor模式
{
Proactor和Reactor模式
public:
Proactor和Reactor模式    
virtual void onRead() = 0;
Proactor和Reactor模式}

Proactor和Reactor模式
Proactor和Reactor模式
// application level handler.
Proactor和Reactor模式
class MyHandler public Handler
Proactor和Reactor模式
{
Proactor和Reactor模式
public:
Proactor和Reactor模式    
void onRead() 
Proactor和Reactor模式    
{
Proactor和Reactor模式        
// 
Proactor和Reactor模式
    }

Proactor和Reactor模式}

Proactor和Reactor模式
Proactor和Reactor模式


Reactor通过某种变形,可以将其改装为Proactor,在某些不支持异步IO的系统上,也可以隐藏底层的实现,利于编写跨平台
代码。我们只需要在dispatch(也就是demultiplexor)中封装同步IO操作的代码,在上层,用户提交自己的缓冲区到这一层,
这一层检查到设备可操作时,不像原来立即回调handler,而是开始IO操作,然后将操作结果放到用户缓冲区(读),然后再
回调handler。这样,对于上层handler而言,就像是proactor一样。



----

其他参考:

两种高性能I/O设计模式(Reactor/Proactor)的比较  http://blog.sina.com.cn/s/blog_5f435c130101ktl6.html


2010-08-18 13:03:00 yunccll 阅读数 4125

 

select.select(rlist, wlist, xlist[, timeout])

 

> 平台依赖性:

 Windows只支持socket作为fd,不支持文件的fd

 Linux 和Unix平台都支持

 

 

> 输入的文件描述符号可以为如下几种情况:

python file objects: 

              1. sys.stdin  

              2. open()   或者 os.popen()

              3. socket.socket()

              4. 其他可以调用fileno()的对象返回的文件描述符号(这个是真实的 fd,不仅仅是一个随机整数)


> 前三个参数是 ‘waitable objects’的序列,要么是整数代表的fd,或者是由fileno()返回的对象才能作为三个序列的元素

rlist: wait until ready for reading
wlist: wait until ready for writing
xlist: wait for an “exceptional condition” (see the manual page for what your system considers such a condition)

三个参数是否全为空是平台相关的,Unix系统是可以全为空的,Windows不能全为空.

Unix全为空的话,可以作为定时器用了。

 

> timeout 参数

timeout参数是浮点数代表的参数,单位是seconds

timeout参数省略(默认为None),则代表永远block直到有一个fd准备好。

timeout = 0, 代表不会block,或者(specifies a poll)

 

> return 参数

返回值为三个list,包含三个已经准备好的fd列表(不会超过输入三个参数)。

当timeout后还没有fd准备好,则返回三个空列表([],[],[])

 

 

python 的 List Comprehensions语法:

这个语法的目的是用于创建列表,其他相似的语法还有(map,filter, zip 和 lambda)

[f(x) 跟随 0 个 或多个 for 或 if 子句]

[f(x)]

[f(x) for x]

[f(x) for x... if .....]

[f(x, y) for x..for y...]

[f(x, y) for x.. if ..... for y... if ....]

 

2013-01-26 01:10:52 Mirage520 阅读数 636

这是标题所提的ACE学习教程第六章里面的一个实例

#include "ace/Reactor.h" 
#include "ace/SOCK_Acceptor.h" 

#define PORT_NO 19998 
typedef ACE_SOCK_Acceptor Acceptor; 
//forward declaration 
class My_Accept_Handler; 

//数据处理器 
class My_Input_Handler: public ACE_Event_Handler 
{ 
    public: 
    //构造方法 
    My_Input_Handler() 
    { 
    ACE_DEBUG((LM_DEBUG,"Constructor\n")); 
    } 
     
    //回调任何从句柄来的数据 
    int handle_input(ACE_HANDLE) 
    { 
    //获得数据 
    peer_i().recv_n(data,12); 
    ACE_DEBUG((LM_DEBUG,"%s\n",data)); 

//事件处理区 

    return 0; 
    } 
     
    //必须有的,传给handle_input的句柄 
    ACE_HANDLE get_handle(void)    
    { 
    return this->peer_i().get_handle(); 
    } 
     
                           
    ACE_SOCK_Stream &peer_i() 
    { 
    return this->peer_; 
    } 
     
    private: 
    ACE_SOCK_Stream peer_; //事件处理I/O入口,缓冲区 
    char data [12]; 
}; 

//接受处理器 
class My_Accept_Handler: public ACE_Event_Handler 
{ 
public: 
//构造 
My_Accept_Handler(ACE_Addr &addr) 
{ 
this->open(addr); 
} 

//开始监听 
//接受来自客户端的请求 
int open(ACE_Addr &addr) 
{ 
peer_acceptor.open(addr); 
return 0; 
} 

//重载I/O处理区,client来处理 
int handle_input(ACE_HANDLE handle) 
{ 
//已经来了一个client,创建数据处理器实例去处理 
My_Input_Handler *eh= new My_Input_Handler(); 

//Accept the connection "into" the Event Handler 
if (this->peer_acceptor.accept (eh->peer_i(), // 流对象 
0, // 远程地址 
0, // timeout 
1) ==-1) //restart if interrupted 
ACE_DEBUG((LM_ERROR,"Error in connection\n")); 

ACE_DEBUG((LM_DEBUG,"Connection established\n")); 

// 把该处理器绑定到单体反应器 中 
ACE_Reactor::instance()->register_handler(eh,ACE_Event_Handler::READ_MASK); 

//如果你想处理更多的客户端就返回 -1,如果想永久多个客户端,返回 0 
return 0; 
} 
//必须有的,传给handle_input的句柄 
ACE_HANDLE get_handle(void) const 
{ 
return this->peer_acceptor.get_handle(); 
} 
private: 
Acceptor peer_acceptor; 
}; 


int main(int argc, char * argv[]) 
{ 

ACE_INET_Addr addr(PORT_NO); 

My_Accept_Handler *eh=new My_Accept_Handler(addr); 

ACE_Reactor::instance()->register_handler(eh, ACE_Event_Handler::ACCEPT_MASK); 

while(1) 
ACE_Reactor::instance()->handle_events(); 
}

转帖:http://raojl.iteye.com/blog/337952

2018-12-02 10:25:02 weixin_33805743 阅读数 6

本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。期待加入IOT时代最具战斗力的团队。QQ邮箱地址:1120746959@qq.com,如有任何学术交流,可随时联系。

1 Reactor单线程案例代码热热身

  • 如下是单线程的JAVA NIO编程模型。

  • 首先服务端创建ServerSocketChannel对象,并注册到Select上OP_ACCEPT事件,然后ServerSocketChannel负责监听指定端口上的连接请求。

  • 客户端一旦连接上ServerSocketChannel,就会触发Acceptor来处理OP_ACCEPT事件,并为来自客户端的连接创建Socket Channel,并设置为非阻塞模式,并在其Selector上注册OP_READ或者OP_WRITE,最终实现客户端与服务端的连接建立和数据通道打通。

  • 当客户端向建立的SocketChannel发送请求时,服务端的Selector就会监听到OP_READ事件,并触发相应的处理逻辑。当服务端向客户端写数据时,会触发服务端Selector的OP_WRITE事件,从而执行响应的处理逻辑。

  • 这里有一个明显的问题,就是所有时间的处理逻辑都是在Acceptor单线程完成的,在并发连接数较小,数据量较小的场景下,是没有问题的,但是......

  • Selector 允许一个单一的线程来操作多个 Channel. 如果我们的应用程序中使用了多个 Channel, 那么使用 Selector 很方便的实现这样的目的, 但是因为在一个线程中使用了多个 Channel, 因此也会造成了每个 Channel 传输效率的降低.

  • 优化点在于:通道连接|读取或写入|业务处理均采用单线程来处理。通过线程池或者MessageQueue共享队列,进一步优化了高并发的处理要求,这样就解决了同一时间出现大量I/O事件时,单独的Select就可能在分发事件时阻塞(或延时),而成为瓶颈的问题。

      public class NioEchoServer {
      private static final int BUF_SIZE = 256;
      private static final int TIMEOUT = 3000;
    
      public static void main(String args[]) throws Exception {
          // 打开服务端 Socket
          ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    
          // 打开 Selector
          Selector selector = Selector.open();
    
          // 服务端 Socket 监听8080端口, 并配置为非阻塞模式
          serverSocketChannel.socket().bind(new InetSocketAddress(8080));
          serverSocketChannel.configureBlocking(false);
    
          // 将 channel 注册到 selector 中.
          // 通常我们都是先注册一个 OP_ACCEPT 事件, 然后在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READ
          // 注册到 Selector 中.
          serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    
          while (true) {
              // 通过调用 select 方法, 阻塞地等待 channel I/O 可操作
              if (selector.select(TIMEOUT) == 0) {
                  System.out.print(".");
                  continue;
              }
    
              // 获取 I/O 操作就绪的 SelectionKey, 通过 SelectionKey 可以知道哪些 Channel 的哪类 I/O 操作已经就绪.
              Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
    
              while (keyIterator.hasNext()) {
    
                  SelectionKey key = keyIterator.next();
    
                  // 当获取一个 SelectionKey 后, 就要将它删除, 表示我们已经对这个 IO 事件进行了处理.
                  keyIterator.remove();
    
                  if (key.isAcceptable()) {
                      // 当 OP_ACCEPT 事件到来时, 我们就有从 ServerSocketChannel 中获取一个 SocketChannel,
                      // 代表客户端的连接
                      // 注意, 在 OP_ACCEPT 事件中, 从 key.channel() 返回的 Channel 是 ServerSocketChannel.
                      // 而在 OP_WRITE 和 OP_READ 中, 从 key.channel() 返回的是 SocketChannel.
                      SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
                      clientChannel.configureBlocking(false);
                      //在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READ 注册到 Selector 中.
                      // 注意, 这里我们如果没有设置 OP_READ 的话, 即 interest set 仍然是 OP_CONNECT 的话, 那么 select 方法会一直直接返回.
                      clientChannel.register(key.selector(), OP_READ, ByteBuffer.allocate(BUF_SIZE));
                  }
    
                  if (key.isReadable()) {
                      SocketChannel clientChannel = (SocketChannel) key.channel();
                      ByteBuffer buf = (ByteBuffer) key.attachment();
                      long bytesRead = clientChannel.read(buf);
                      if (bytesRead == -1) {
                          clientChannel.close();
                      } else if (bytesRead > 0) {
                          key.interestOps(OP_READ | SelectionKey.OP_WRITE);
                          System.out.println("Get data length: " + bytesRead);
                      }
                  }
    
                  if (key.isValid() && key.isWritable()) {
                      ByteBuffer buf = (ByteBuffer) key.attachment();
                      buf.flip();
                      SocketChannel clientChannel = (SocketChannel) key.channel();
    
                      clientChannel.write(buf);
    
                      if (!buf.hasRemaining()) {
                          key.interestOps(OP_READ);
                      }
                      buf.compact();
                  }
              }
          }
      }
    复制代码

}

2 Kafka Reactor模式设计思路

  • SelectionKey.OP_READ:Socket 读事件,以从远程发送过来了相应数据

  • SelectionKey.OP_WRITE:Socket写事件,即向远程发送数据

  • SelectionKey.OP_CONNECT:Socket连接事件,用来客户端同远程Server建立连接的时候注册到Selector,当连接建立以后,即对应的SocketChannel已经准备好了,用户可以从对应的key上取出SocketChannel.

  • SelectionKey.OP_ACCEPT:Socket连接接受事件,用来服务器端通过ServerSocketChannel绑定了对某个端口的监听,然后会让其SocketChannel对应的socket注册到服务端的Selector上,并关注该OP_ACCEPT事件。

  • Kafka的网络层入口类是SocketServer。 我们知道,kafka.Kafka是Kafka Broker的入口类,kafka.Kafka.main()是Kafka Server的main()方法,即Kafka Broker的启动入口。我们跟踪代码,即沿着方法调用栈kafka.Kafka.main() -> KafkaServerStartable() -> KafkaServer().startup可以从main()方法入口一直跟踪到SocketServer即网络层对象的创建,这意味着Kafka Server启动的时候会初始化并启动SocketServer。

  • Acceptor的构造方法中,首先通过openServerSocket()打开自己负责的EndPoint的Socket,即打开端口并启动监听。 然后,Acceptor会负责构造自己管理的一个或者多个Processor对象。其实,每一个Processor都是一个独立线程。

       private[kafka] class Acceptor(val endPoint: EndPoint,
                                        val sendBufferSize: Int,
                                        val recvBufferSize: Int,
                                        brokerId: Int,
                                        processors: Array[Processor],
                                        connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
      
        private val nioSelector = NSelector.open()
        val serverChannel = openServerSocket(endPoint.host, endPoint.port)//创建一个ServerSocketChannel,监听endPoint.host, endPoint.port套接字
      
        //Acceptor被构造的时候就会启动所有的processor线程
        this.synchronized {
          //每个processor创建一个单独线程
          processors.foreach { processor =>
            Utils.newThread("kafka-network-thread-%d-%s-%d".format(brokerId, endPoint.protocolType.toString, processor.id), processor, false).start()
          }
        }
    复制代码
  • Acceptor线程的run()方法,是不断监听对应ServerChannel上的连接请求,如果有新的连接请求,就选择出一个Processor,用来处理这个请求,将这个新连接交付给Processor是在方法Acceptor.accept()

     def accept(key: SelectionKey, processor: Processor) {
         val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]//取出channel
         val socketChannel = serverSocketChannel.accept()//创建socketChannel,专门负责与这个客户端的连接
         try {
           //socketChannel参数设置
           processor.accept(socketChannel)//将SocketChannel交给process进行处理
         } catch {
           //异常处理
         }
       }
     
     //Processor.accept():
      /**
        * Queue up a new connection for reading
        */
       def accept(socketChannel: SocketChannel) {
         newConnections.add(socketChannel)
         wakeup()
       }
    复制代码
  • 每一个Processor都维护了一个单独的KSelector对象,这个KSelector只负责这个Processor上所有channel的监听。这样最大程度上保证了不同Processor线程之间的完全并行和业务隔离,尽管,在异步IO情况下,一个Selector负责成百上千个socketChannel的状态监控也不会带来效率问题。

       override def run() {
          startupComplete()//表示初始化流程已经结束,通过这个CountDownLatch代表初始化已经结束,这个Processor已经开始正常运行了
          while (isRunning) {
            try {
              // setup any new connections that have been queued up
              configureNewConnections()//为已经接受的请求注册OR_READ事件
              // register any new responses for writing
              processNewResponses()//处理响应队列,这个响应队列是Handler线程处理以后的结果,会交付给RequestChannel.responseQueue.同时调用unmute,开始接受请求
              poll()  //调用KSelector.poll(),进行真正的数据读写
              processCompletedReceives()//调用mute,停止接受新的请求
              processCompletedSends()
              processDisconnected()
            } catch {
              //异常处理 略
          }
      
          debug("Closing selector - processor " + id)
          swallowError(closeAll())
          shutdownComplete()
       }
    复制代码
  • KSelector.register()方法,开始对远程客户端或者其它服务器的读请求(OP_READ)进行绑定和处理。KSelect.register()方法,会将服务端的SocketChannel注册到服务器端的nioSelector,并关注SelectionKey.OP_READ,即,如果发生读请求,可以取出对应的Channel进行处理。这里的Channel也是Kafka经过封装以后的KafkaChannel对象

      public void register(String id, SocketChannel socketChannel) throws ClosedChannelException {
              SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_READ);
              //如果是SocketServer创建的这个对象并且是纯文本,则channelBuilder是@Code PlainTextChannelBuilder
              KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);//构造一个KafkaChannel
              key.attach(channel);//将KafkaChannel对象attach到这个registration,以后可以通过调用SelectionKey.attachment()获得这个对象
              this.channels.put(id, channel);//记录这个Channel
          }
    复制代码
  • Processor.processCompletedReceives()通过遍历completedReceives,对于每一个已经完成接收的数据,对数据进行解析和封装,交付给RequestChannel,RequestChannel会交付给具体的业务处理层进行处理。

    * 将completedReceived中的对象进行封装,交付给requestQueue.completRequets
     */
    private def processCompletedReceives() {
      selector.completedReceives.asScala.foreach { receive =>//每一个receive是一个NetworkReceivedui'xiagn
        try {
          //receive.source代表了这个请求的发送者的身份,KSelector保存了channel另一端的身份和对应的SocketChannel之间的对应关系
          val channel = selector.channel(receive.source)
          val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
            channel.socketAddress)
          val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
          requestChannel.sendRequest(req)//将请求通过RequestChannel.requestQueue交付给Handler
          selector.mute(receive.source)//不再接受Read请求,发送响应之前,不可以再接收任何请求
        } catch {
          //异常处理 略
        }
      }
    }
    复制代码

  • 详情源码剖析请参考如下博客,讲解非常详细。

      https://blog.csdn.net/zhanyuanlin/article/details/76556578
      https://blog.csdn.net/zhanyuanlin/article/details/76906583
    复制代码
  • RequestChannel 负责消息从网络层转接到业务层,以及将业务层的处理结果交付给网络层进而返回给客户端。每一个SocketServer只有一个RequestChannel对象,在SocketServer中构造。RequestChannel构造方法中初始化了requestQueue,用来存放网络层接收到的请求,这些请求即将交付给业务层进行处理。同时,初始化了responseQueues,为每一个Processor建立了一个response队列,用来存放这个Processor的一个或者多个Response,这些response即将交付给网络层返回给客户端。

      //创建RequestChannel,有totalProcessorThreads个responseQueue队列,
        val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests)
      class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
        private var responseListeners: List[(Int) => Unit] = Nil
        //request存放了所有Processor接收到的远程请求,负责把requestQueue中的请求交付给具体业务逻辑进行处理
        private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
        //responseQueues存放了所有Processor的带出来的response,即每一个Processor都有一个response queue
        private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
        for(i <- 0 until numProcessors) //初始化responseQueues
          responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()
      
        //一些metrics用来监控request和response的数量,代码略
        }
    复制代码
  • KafkaApis是Kafka的API接口层,可以理解为一个工具类,职责就是解析请求然后获取请求类型,根据请求类型将请求交付给对应的业务层

      class KafkaRequestHandlerPool(val brokerId: Int,
                                val requestChannel: RequestChannel,
                                val apis: KafkaApis,
                                numThreads: Int) extends Logging with KafkaMetricsGroup {
          
            /* a meter to track the average free capacity of the request handlers */
            private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)
          
            this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
            val threads = new Array[Thread](numThreads)
            //初始化由KafkaRequestHandler线程构成的线程数组
            val runnables = new Array[KafkaRequestHandler](numThreads)
            for(i <- 0 until numThreads) {
              runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis)
              threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
              threads(i).start()
    }
    复制代码
  • KafkaRequestHandler.run()方法,就是不断从requestQueue中取出请求,调用API层业务处理逻辑进行处理

       def run() {
          while(true) {
            try {
              var req : RequestChannel.Request = null
              while (req == null) {
              //略
              req = requestChannel.receiveRequest(300)//从RequestChannel.requestQueue中取出请求
              //略
              apis.handle(req)//调用KafkaApi.handle(),将请求交付给业务
            } catch {}
          }
        }
    复制代码

3 参数调优设置

  • numProcessorThreads:通过num.network.threads进行配置,单个Acceptor所管理的Processor对象的数量。
  • maxQueuedRequests:通过queued.max.requests进行配置,请求队列所允许的最大的未响应请求的数量,用来给ConnectionQuotas进行请求限额控制,避免Kafka Server产生过大的网络负载;
  • totalProcessorThreads:计算方式为numProcessorThreads * endpoints.size,即单台机器总的Processor的数量;
  • maxConnectionsPerIp:配置项为max.connections.per.ip,单个IP上的最大连接数,用来给ConnectionQuotas控制连接数;
  • num.io.threads:表示KafkaRequestHander实际从队列中获取请求进行执行的线程数,默认是8个。

4 总结

  • 通过Acceptor、Processor、RequestChannel、KafkaRequestHandler以及KafkaApis多个角色的解析,完成了整个Kafka的消息流通闭环,即从客户端建立连接、发送请求给Kafka Server的Acceptor进行处理,进一步交由Processor、Kafka Server将请求交付给KafkaRequestHandler具体业务进行处理、业务将处理结果返回给网络层、网络层将结果通过NIO返回给客户端。

  • 由于多Processor线程、以及KafkaRequestHandlerPoll线程池的存在,通过交付-获取的方式而不是阻塞等待的方式,让整个消息处理实现完全的异步化,各个角色各司其职,模块之间无耦合,线程之间或者相互竞争任务,或者被上层安排处理部分任务,整个效率非常高,结构也相当清晰

  • 本文参考了大量技术博客,加上个人的理解,通过走读源码完成这篇学习笔记,辛苦成文,实属不易,各自珍惜。

  • 秦凯新 于深圳

kafka集群Broker端基于Reactor模式请求处理流程深入剖析-kafka商业环境实战

阅读数 1548

本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。期待加入IOT时代最具战斗力的团队。QQ邮箱地址:1120746959@qq.com,如有任何学术交流,可随时联系。1Reactor单线程案例代码热热身如下是单线程的JAVANIO编程模型。首先服务端创建ServerS...

博文 来自: shenshouniu
没有更多推荐了,返回首页