发布react到服务器
2018-03-19 21:37:47 sinat_30545941 阅读数 156

这里写图片描述
并发性不是太高服务器程序使用这个模式基本就可以满足。
主线程只负责监听文件描述上是否有事件,如果有就立即将该事件通知到工作线程。读写数据、接受新的连接以及处理客户端的消息都在工作线程。
工作流:
1)主线程往 epoll 内核事件表中注册 socket 上的读就绪事件
2)主线程调用 epoll_wait 等待 socket 上有数据可读
3)当 socket 上有数据可读时, epoll_wait 通知主线程。主线程则将 socket 可读事件放入请求队列。
4)睡眠在请求队列上的某个工作线程被唤醒,他从 socket 读取数据,并处理客户请求,然后往 epoll 内核事件表中注册该socket 上的写就绪时间。
5)主线程调用 epoll_wait 等待 socket 可写。
6)当 scoket 可写, epoll_wait 通知主线程。主线程将 socket 可写事件放入请求队列。
7)睡眠在请求队列上的某个工作线程被唤醒,它往 socket 上写入服务器处理客户端请求的结果。

公司游戏后端架构基本遵循此工作流:
主线程是一个循环,每一帧使用 epoll_wait 等待事件发生,有可读写事件或请求连接事件时处理这些事件,将可读事件的内容 recv出来创建一个消息对象,放入事件处理队列中,只不过请求连接事件是直接在主线程处理的。
CMain 对象初始化时就创建 pack_deal 工作线程池。工作线程则每一帧从工作队列里取出消息对象及 sock 进行处理,处理完之后再写入一个消息对象放在队列中,再由主线程 send 出去。
主要不同是读取和发送还有创建连接是在主线程,然后工作线程池容量可以在 配置文件 .conf 中配置,日常只是配置一个。

大多数游戏服的同时在线人数在 20 左右,最高的新区可达60人,老区每天大概30人,所以使用这种模型足够应付。

发布react到服务器 相关内容

2016-12-20 21:41:35 drdairen 阅读数 2500

Reactor模式是一个架构模式,它主要解决的问题是高并发场景下的服务器的性能问题

原来的服务器与客户端的链接是一对一的,也就是说一个客户端socket接到后,对应一个线程去接收和处理,这种模式的好处,是思路很清晰,一个线程处理一个socket请求,但是这种太消耗线程资源,因为毕竟socket不是实时都有数据接入的,例如网卡,就是属于是典型的慢速设备。
因此,如何能高效的利用一些模式,可以改变这种局面,这个时候Reactor模式就出来了:
基于socket的Reactor模式
上图就是基于socket的reactor模式,socket作为文件fd,被多路事件分离器监听,这个多路事件分离器一般就是指Select/poll这种IO多路复用的机制,select/poll将事件分离出来后,委托给反应器去寻找创建对应的事件处理器,当创建完成后,以后有socket事件进来,直接由select分离出来,交由对应的Handler去处理。

Reactor模式其实最初其实来源于 华盛顿大学 Douglas C.Schmdt的一篇论文,最初用于解决分布式日志系统存在慢速设备的问题。论文中的类图如下:
这里写图片描述
标准的Reactor模式图一共有5个角色:
1、描述符(handle):
由操作系统提供,用于识别每一个事件,如Socket描述符、文件描述符fd等。在Linux中,它用一个整数来表示。
事件可以来自外部,如来自客户端的连接请求、数据等。事件也可以来自内部,如定时器事件。

2、同步事件分离器(demultiplexer):

是一个函数,用来等待一个或多个事件的发生。调用者会被阻塞,直到分离器分离的描述符集上有事件发生。
Linux的select/poll/epoll函数是一个经常被使用的分离器。

3、事件处理器接口(event handler):

是由一个或多个模板函数组成的接口。这些模板函数描述了和应用程序相关的对某个事件的操作。

4、具体的事件处理器:

是事件处理器接口的实现。它实现了应用程序提供的某个服务。也就是libev中的回调函数callback函数。
每个具体的事件处理器总和一个描述符相关。它使用描述符来识别事件、识别应用程序提供的服务。

5、Reactor 管理器(reactor):

定义了一些接口,用于应用程序控制事件调度,以及应用程序注册、删除事件处理器和相关的描述符。它是事件处理器的调度核心。(如libev中的loop)
Reactor管理器使用同步事件分离器来等待事件的发生。
一旦事件发生,Reactor管理器先是分离每个事件,然后调度事件处理器,最后调用相关的模 板函数来处理这个事件。

其实对Reactor实现的最好实现在C++的ACE框架上,ACE框架用于替代传统socket这种编程方式的,提供了很多封装的模式的框架,其中有一个模式,就叫Reactor框架。
对应:
Reactor对应的就是ACE的Reactor,负责handler注册,和整个Reactor模式的管控;
具体的事件源,也就是handle,其实也就是对应的fd,和socket的文件描述符等事件产生源;
多路复用事件分离器,其实就是操作系统中的select,poll等机制
Event Handler接口就是对应的事件接口,ACE将这个Reactor的接口标准化了,供应用自己去实现自己的Event Handler;
Application Event Handler是对应的应用的事件Handler,继承自Event Handler接口;
对于ACE Reactor框架,还提供了诸如定时任务队列等的支持,功能可谓非常强大。

在Java中,其实大家都非常比较清楚了,NIO其实也比较类似Reactor模式

Selector可以理解为操作系统底层映射到java API层级的一个多路复用事件的分离器。
SelectionKey其实就是对应的事件,如socket read,socket listen,accept等等。
当对应的事件触发之后,NIO需要你自己针对于SelectionKey中的事件进行if else的判断,而这部分就没有用到Handler,
当然,如ServerSocketChannel,FileChannel对应的事件就那么几个,完全自己可以封装一个完美的Reactor框架。

在我们的libev事件驱动的网络库来说,上述reactor模型的5个角色也都有很明显的对应,类似c++的ACE框架。

总结一下,Reactor模式产生的年头较早,它最初解决的问题是一客户端连接一线程模型中,针对于慢速客户端如打印机等的线程时间片的浪费问题,
后期逐渐演变为服务器前端架构模式,ACE,libev,JAVA的NIO都是基于Reactor的实现案例。

发布react到服务器 相关内容

2012-08-30 00:19:45 xtwolf008 阅读数 8639

JAVA NIO 多线程服务器 1.2版
Reactor 模式的 JAVA NIO 多线程服务器

JDK 1.4开始,Java的标准库中就包含了NIO,即所谓的“New IO”。其中最重要的功能就是提供了“非阻塞”的IO,当然包括了Socket。NonBlocking的IO就是对select(Unix平台下)以及 WaitForMultipleObjects(Windows平台)的封装,提供了高性能、易伸缩的服务架构。

说来惭愧,直到JDK1.4才有这种功能,但迟到者不一定没有螃蟹吃,NIO就提供了优秀的面向对象的解决方案,可以很方便地编写高性能的服务器。

话说回来,传统的Server/Client实现是基于Thread per request,即服务器为每个客户端请求建立一个线程处理,单独负责处理一个客户的请求。比如像Tomcat(新版本也会提供NIO方案)、Resin等Web服务器就是这样实现的。当然为了减少瞬间峰值问题,服务器一般都使用线程池,规定了同时并发的最大数量,避免了线程的无限增长。

但这样有一个问题:如果线程池的大小为100,当有100个用户同时通过HTTP现在一个大文件时,服务器的线程池会用完,因为所有的线程都在传输大文件了,即使第101个请求者仅仅请求一个只有10字节的页面,服务器也无法响应了,只有等到线程池中有空闲的线程出现。

另外,线程的开销也是很大的,特别是达到了一个临界值后,性能会显著下降,这也限制了传统的Socket方案无法应对并发量大的场合,而“非阻塞”的IO就能轻松解决这个问题。

下面只是一个简单的例子:服务器提供了下载大型文件的功能,客户端连接上服务器的12345端口后,就可以读取服务器发送的文件内容信息了。注意这里的服务器只有一个主线程,没有其他任何派生线程,让我们看看NIO是如何用一个线程处理N个请求的。

NIO服务器最核心的一点就是反应器模式:当有感兴趣的事件发生的,就通知对应的事件处理器去处理这个事件,如果没有,则不处理。所以使用一个线程做轮询就可以了。当然这里这是个例子,如果要获得更高性能,可以使用少量的线程,一个负责接收请求,其他的负责处理请求,特别是对于多CPU时效率会更高。

关于使用NIO过程中出现的问题,最为普遍的就是为什么没有请求时CPU的占用率为100%?出现这种问题的主要原因是注册了不感兴趣的事件,比如如果没有数据要发到客户端,而又注册了写事件(OP_WRITE),则在 Selector.select()上就会始终有事件出现,CPU就一直处理了,而此时select()应该是阻塞的。

线程模型

NIO 的选择器采用了多路复用(Multiplexing)技术,可在一个选择器上处理多个套接字,通过获取读写通道来进行 IO 操作。由于网络带宽等原因,在通道的读、写操作中是容易出现等待的,所以在读、写操作中引入多线程,对性能提高明显,而且可以提高客户端的感知服务质量。所以本文的模型将主要通过使用读、写线程池来提高与客户端的数据交换能力。

如下图所示,服务端接受客户端请求后,控制线程将该请求的读通道交给读线程池,由读线程池分配线程完成对客户端数据的读取操作;当读线程完成读操作后,将数据返回控制线程,进行服务端的业务处理;完成业务处理后,将需回应给客户端的数据和写通道提交给写线程池,由写线程完成向客户端发送回应数据的操作。


(NIO 多线程服务器模型)

同时整个服务端的流程处理,建立于事件机制上。在 [接受连接->读->业务处理->写 >关闭连接 ]这个过程中,触发器将触发相应事件,由事件处理器对相应事件分别响应,完成服务器端的业务处理。
下面我们就来详细看一下这个模型的各个组成部分。

 

 

public class MiniServer extends Thread
{
    private static final Log log = LogFactory.getLog(MiniServer.class);
   
    private final Selector s;
    private final ServerSocketChannel ssc;
    private ExecutorService executor;
   
    public MiniServer(int portnumber,ExecutorService executor) throws IOException
    {
        this.executor=executor;
        s = Selector.open();
        ssc = ServerSocketChannel.open();
        ssc.socket().bind(new InetSocketAddress(portnumber));
        ssc.configureBlocking(false);
        ssc.register(s,SelectionKey.OP_ACCEPT);
    }
   
    public void run()
    {
        try
        {
            while(s.isOpen())
            {
                int nKeys=s.select();
                if(nKeys>0)
                {
                    Iterator<SelectionKey> it = s.selectedKeys().iterator();
                    while (it.hasNext())
                    {
                        SelectionKey key = it.next();
                        it.remove();
                        if (!key.isValid() || !key.channel().isOpen())
                            continue;
                        if(key.isAcceptable())
                        {
                            SocketChannel sc = ssc.accept();
                            if (sc != null)
                            {
                                sc.configureBlocking(false);
                                sc.register(s, SelectionKey.OP_READ, new Reader(executor));
                            }
                        }
                        else if(key.isReadable()||key.isWritable())
                        {
                            Reactor reactor = (Reactor) key.attachment();
                            reactor.execute(key);
                        }
                    }
                }
            }
        }
        catch(IOException e)
        {
            log.info(e);
        }
    }
}


public interface Reactor
{
    void execute(SelectionKey key);
}


public class Reader implements Reactor
{
    private static final Log log = LogFactory.getLog(Reader.class);
   
    private byte[] bytes=new byte[0];
    private ExecutorService executor;
   
    public Reader(ExecutorService executor)
    {
        this.executor=executor;
    }
   
    @Override
    public void execute(SelectionKey key)
    {
        SocketChannel sc = (SocketChannel) key.channel();
        try
        {
            ByteBuffer buffer=ByteBuffer.allocate(1024);
            int len=-1;
            while(sc.isConnected() && (len=sc.read(buffer))>0)
            {
                buffer.flip();
                  byte [] content = new byte[buffer.limit()];
                buffer.get(content);
                bytes=NutUtil.ArrayCoalition(bytes,content);
                buffer.clear();
            }
            if(len==0)
            {
                key.interestOps(SelectionKey.OP_READ);
                key.selector().wakeup();
            }
            else if(len==-1)
            {
                Callable<byte[]> call=new ProcessCallable(bytes);
                Future<byte[]> task=executor.submit(call);
                ByteBuffer output=ByteBuffer.wrap(task.get());
                sc.register(key.selector(), SelectionKey.OP_WRITE, new Writer(output));
            }
        }
        catch(Exception e)
        {
            log.info(e);
        }
    }
}


public class Writer implements Reactor
{
    private static final Log log = LogFactory.getLog(Writer.class);
   
    private ByteBuffer output;
   
    public Writer(ByteBuffer output)
    {
        this.output=output;
    }
   
    public void execute(SelectionKey key)
    {
        SocketChannel sc = (SocketChannel) key.channel();
        try
        {
            while(sc.isConnected() && output.hasRemaining())
            {
                int len=sc.write(output);
                if(len<0)
                {
                    throw new EOFException();
                }
                if(len==0)
                {
                    key.interestOps(SelectionKey.OP_WRITE);
                    key.selector().wakeup();
                    break;
                }
            }
            if(!output.hasRemaining())
            {
                output.clear();
                key.cancel();
                sc.close();
            }
        }
        catch(IOException e)
        {
            log.info(e);
        }
    }
}

 

发布react到服务器 相关内容

2015-11-15 11:22:31 FebruarySwallow 阅读数 559

在服务器端使用Reactor框架

使用Reactor框架的服务器端结构如下:

服务器端注册两种事件处理器,Cli_acceptor和Cli_server ,Cli_server类负责和客户端的通信,每一个Cli_server对象对应一个客户端的Socket连接。 Cli_acceptor专门负责被动接受客户端的连接,并创建Cli_server对象。这样,在一个N个Socket连接的服务器程序中,将存在1个Cli_acceptor对象和N个Cli_server对象。

整个服务器端流程如下:

首先创建一个Cli_acceptor对象,该对象在Reactor上注册ACCEPT_MASK事件,Reactor将自动在监听端口建立Socket监听。
如果有对该端口的Socket连接时,Reactor将自动回调handle_input方法,Cli_acceptor重载此方法,并创建一个Cli_server对象,用于处理和Client的通信。
Cli_server对象根据服务器的具体功能实现,其处理过程和客户端程序类似,注册相应的回调事件并分发即可。

下面为Cli_server.h:`

#ifndef GGGGG
#define GGGGG
#include"ace/Event_Handler.h"
#include"ace/Log_Msg.h"
#include"ace/SOCK_Stream.h"
#include"ace/Reactor.h"
class Cli_server : public ACE_Event_Handler
{
    public:
        Cli_server();
        ~Cli_server();
        const ACE_SOCK_Stream &get_Stream();
        void set_ptr(Cli_server*);
        void set_Stream(const ACE_SOCK_Stream peer);
        int register_read();
        virtual int handle_input(ACE_HANDLE fd);
        virtual ACE_HANDLE get_handle() const;
        virtual int handle_close(ACE_HANDLE ,ACE_Reactor_Mask close_mask);
    private:
        ACE_SOCK_Stream peer;
        Cli_server *cliser;
        bool nd_dt;
};
#endif`

先解释三个成员变量, peer负责真正的通信,cliser和nd_dt是为了释放资源(可能我用的是虚拟机,ACE中的智能指针貌似用不了,只能手动释放资源),若果nd_dt为true,就需要释放。这个到用的时候会详细解释。

构造函数和析构函数就不说了。
get_Stream():函数可以省略不管,没什么用。

set_Stream():用来设置peer的值,这个函数用一个ACE_SOCK_Stream对象作为参数,注意这个 按值传递,具体原因后面会解释。

register_read():用来注册读事件,将peer的读事件注册到反应器。

handle_input():十分重要的函数,当我们注册的事件发生时,会调用这个函数。这个函数从父类
ACE_Event_Handler继承而来,你必须重写它。

handle_close():在其他handle_*()挂钩方法中的一个返回-1时,或是在ACE_Reactor::remove_handler()被显式调用来解除事件处理器的登记时,执行用户定义的终止活动的挂钩方法

get_handle():这个函数同handle_input()继承自父类,你必须重载它,这个函数后面的const修饰词必须加上(C++多态,方法后是否有const也是一种重载),这个函数会在注册的时候用上,这个函数返回实际的I/O handle,我们这个类即返回peer的handle

下面为Cli_server.cpp:

#include"Cli_server.h"
const ACE_SOCK_Stream& Cli_server::get_Stream() //可以忽略
{
    ACE_DEBUG((LM_DEBUG,ACE_TEXT("get_Stream() : \n")));
    return peer;
}

void Cli_server::set_Stream(const ACE_SOCK_Stream  peer)
{
    ACE_DEBUG((LM_DEBUG,ACE_TEXT("set_Stream() : \n")));
    this->peer=peer;       //peer赋值,按值传递
}

int Cli_server::register_read()
{
    ACE_DEBUG((LM_DEBUG,ACE_TEXT("register_read() : \n")));
    return ACE_Reactor::instance()->register_handler(this,ACE_Event_Handler::READ_MASK);
    //注册读事件,在这个函数执行中,会调用this->get_handle()这个语句,将实际的handle与事件   相关联,若get_handle()函数返回错误的handle或者后面没有const修饰词,就会造成注册失败
}

ACE_HANDLE Cli_server::get_handle() const
{
    return peer.get_handle();//返回peer的handle,即实际做事情的handle
}

int Cli_server::handle_input(ACE_HANDLE fd)
{
    char recvbuf[100]={};  //测试用代码,这个可以随意写
    if(peer.recv(recvbuf,sizeof(recvbuf))<0)
    {
        ACE_DEBUG((LM_ERROR,ACE_TEXT("recv error ! \n")));
        return -1;
    }
    char sendbuf[20]="hello world!";
    peer.send_n(sendbuf,sizeof(sendbuf));
    return 0;
}
int Cli_server::handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask)
{
    close_mask=ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL;
    this->reactor()->remove_handler(this,close_mask);
    peer.close();
    return 0;
}

Cli_server::Cli_server():cliser(NULL),nd_dt(false)
{

}

void Cli_server::set_ptr(Cli_server * ptr)
{
    nd_dt=true;   //这个函数是为了释放内存
    cliser=ptr;
}
Cli_server::~Cli_server()
{
    if(nd_dt)
      delete cliser;  //释放内存
}

下面为Cli_acceptor.h:

#ifndef HHHHHHHHHHHHHH
#define HHHHHHHHHHHHHH
#include"ace/Log_Msg.h"
#include"ace/Event_Handler.h"
#include"ace/SOCK_Acceptor.h"
#include"ace/INET_Addr.h"
#include"Cli_server.h"
#include<memory>
class Cli_acceptor : public ACE_Event_Handler
{
    public:
        ~Cli_acceptor();
        const ACE_SOCK_Acceptor& get_Acceptor();
        void set_Acceptor(const ACE_SOCK_Acceptor& );
        virtual ACE_HANDLE get_handle() const;
        int open(const ACE_INET_Addr &);
        virtual int handle_input(ACE_HANDLE);
        virtual int handle_close(ACE_HANDLE,ACE_Reactor_Mask);
    private:
        ACE_SOCK_Acceptor acceptor;
};
#endif

成员变量为一个ACE_SOCK_Acceptor对象,来完成实际的监听工作和接收连接。
handle_input(),handle_close(),get_handle()就不说了。
get_Acceptor()可以忽略
open():这个函数执行bind(),listen()操作,其实就是实际执行acceptor.open()函数

下面为Cli_acceptor.cpp:

#include"Cli_acceptor.h"
const ACE_SOCK_Acceptor& Cli_acceptor::get_Acceptor()
{
    return acceptor;
}

void Cli_acceptor::set_Acceptor(const ACE_SOCK_Acceptor&acceptor)
{
    this->acceptor=acceptor;
}

int Cli_acceptor::open(const ACE_INET_Addr &addr)
{
    if(acceptor.open(addr,1)==-1) //执行acceptor.open()函数
    {
        ACE_DEBUG((LM_ERROR,ACE_TEXT("Cli_acceptor :: open error!\n")));
        return -1;
    }
    return ACE_Reactor::instance()->register_handler(this,ACE_Event_Handler::ACCEPT_MASK);
    //注册事件
}


int Cli_acceptor::handle_input(ACE_HANDLE fd)
{
    ACE_SOCK_Stream peer;  //这里就是为什么传值而不是传引用,若为引用的话这里就必须为一个指针或者静态变量,但都不符合要求。(智能指针我电脑不能用,静态变量,呵呵)所以这里我用的是传值,当然这里最后用指针动态分配,你们可以自己试一下
    if(acceptor.accept(peer)!=0)
    {
        ACE_DEBUG((LM_ERROR,ACE_TEXT("ACE_acceptor :: handle_input error!\n")));
        return -1;
    }
    Cli_server *cliser=new Cli_server;   //这个。一个连接对应一个Cli_server对象,这个必须要动态生成了,但是没有地方释放资源,所以就有了set_ptr()这个函数和那两个成员变量,就是为了释放这里的资源
    cliser->set_ptr(cliser);
    cliser->set_Stream(peer);//设置peer
    cliser->register_read();//注册读事件
    return 0;
}

int Cli_acceptor::handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask)
{
    close_mask=ACE_Event_Handler::ACCEPT_MASK | ACE_Event_Handler::DONT_CALL;
    this->reactor()->remove_handler(this,close_mask);
    acceptor.close();
}

Cli_acceptor::~Cli_acceptor()
{
    this->handle_close(ACE_INVALID_HANDLE,0);
}

ACE_HANDLE Cli_acceptor::get_handle() const
{
    return acceptor.get_handle();
}

好了两个类已经介绍完了,下面就是main()函数中的内容,当你完成两个类后,就会发现,main()函数很好写,下面为ser.cpp:

#include"ace/Log_Msg.h"
#include"ace/SOCK_Acceptor.h"
#include"ace/SOCK_Stream.h"
#include"ace/INET_Addr.h"
#include"ace/Reactor.h"
#include"ace/SOCK_Connector.h"
#include"Cli_acceptor.h"
int ACE_TMAIN(int argc,ACE_TCHAR**argv)
{
    Cli_acceptor acceptor;          //一个接受连接的对象
    ACE_INET_Addr addr(6666);       //IP地址
    acceptor.open(addr);            //完成bind()和listen()
    while(1)
      ACE_Reactor::instance()->handle_events();//激发事件轮询
    return 1;
}

ACE的事件注册,我大概有一个猜想,但是我不敢肯定,我会查找一些资料再说

发布react到服务器 相关内容

2017-11-15 11:50:09 xiaokaige198747 阅读数 164

本文针对Reactor模式从四个方面进行了阐述,首先简单介绍了Reactor模式是什么;其次,阐述了为什么使用Reactor模式;再次,针对实际生活的应用场景,分析了在什么场景下使用Reactor模式;最后,着重分析讲解了如何使用Reactor模式,以及代码示例。


1、What:Reactor模式是什么?

          反应器设计模式(Reactor pattern)是一种为处理并发服务请求,并将请求提交到一个或者多个服务处理程序的事件设计模式。当客户端请求抵达后,服务处理程序使用多路分配策略,由一个非阻塞的线程来接收所有的请求,然后派发这些请求至相关的工作线程进行处理。 Reactor模式主要包含下面几部分内容。

          初始事件分发器(Initialization Dispatcher):用于管理Event Handler,定义注册、移除EventHandler等。它还作为Reactor模式的入口调用Synchronous Event Demultiplexer的select方法以阻塞等待事件返回,当阻塞等待返回时,根据事件发生的Handle将其分发给对应的Event Handler处理,即回调EventHandler中的handle_event()方法

          同步(多路)事件分离器(Synchronous Event Demultiplexer):无限循环等待新事件的到来,一旦发现有新的事件到来,就会通知初始事件分发器去调取特定的事件处理器。

          系统处理程序(Handles):操作系统中的句柄,是对资源在操作系统层面上的一种抽象,它可以是打开的文件、一个连接(Socket)、Timer等。由于Reactor模式一般使用在网络编程中,因而这里一般指Socket Handle,即一个网络连接(Connection,在Java NIO中的Channel)。这个Channel注册到Synchronous Event Demultiplexer中,以监听Handle中发生的事件,对ServerSocketChannnel可以是CONNECT事件,对SocketChannel可以是READ、WRITE、CLOSE事件等。

          事件处理器(Event Handler): 定义事件处理方法,以供Initialization Dispatcher回调使用。

          对于Reactor模式,可以将其看做由两部分组成,一部分是由Boss组成,另一部分是由worker组成。Boss就像老板一样,主要是拉活儿、谈项目,一旦Boss接到活儿了,就下发给下面的work去处理。也可以看做是项目经理和程序员之间的关系。

2、Why:为什么使用Reactor模式?

Part A:

          对于一个事件驱动的分布式日志登录服务系统,如下图1所示。

这里写图片描述

          客户端应用通过日志服务来录入它们当前状态和记录,这些状态可记录可能包含了错误通知信息、断点调试信息等。日志记录被发送到一个中央日服务器上,该服务器可以处理日志和连接用户请求。客户端想要记录日志信息,首先必须发送一个连接请求给服务器。服务器通过一个“处理工厂”来监听客户端对应的地址信息,以等待这些连接请求的到来。当一个连接请求到来时,“处理工厂”就创建一个handle,其代表了连接的端点,用来建立客户端和服务器之间的连接。当handle收到来自客户端的请求连接时,就会返回给服务器。一旦客户端连接成功,它们就可以同时发送日志记录到服务器。

Part B:

          或许最有效的方法来开发一个并发日志系统是使用多线程,这样可以同时处多个理客户端请求,如下图2所示。

这里写图片描述

       然而,多线程实现这样的分布式日志系统可能会面临下面的问题: 
       可用性:服务器必须能够处理传入请求即使是等待其他请求到达的。特别是,一个服务器不能无限期地处理任何单一来源的事件而排斥其他事件源。因为这可能大大延迟响应其他客户的时间。 
       效率:一个服务器应该做到延迟最小化、吞吐量最大化,避免不必要地使用CPU。多线程可能会导致糟糕的性能由于上下文切换、同步和数据移动。 
       编程简洁:服务器的设计上应该简化使用合适的并发策略。多线程可能需要复杂的并发控制方案。 
       可移植性:多线程不是可用在所有操作系统平台。 
       适应性:集成新的或改进服务,如改变消息格式或添加服务器端缓存,应该承担最小的现有代码的修改和维护成本。例如,实现新应用程序服务应该不需要修改通用事件多路分解和调度机制。

Part C:

       针对上面的问题,可以集成同步多路分解事件并分发相应的事件处理程序来处理相应的事件。对于每一个应用程序所提供的服务,引入一个单独的事件处理器处理某些类型的事件。所有事件处理程序实现了相同的接口。事件处理程序注册一个初始调度程序,它使用一个同步事件信号分离器等待事件发生。当事件发生时,同步事件信号分离器通知初始调度器,它同步告知事件处理程序去关联对应的事件。事件处理程序然后分派事件到实现了所请求服务的方法中。

       上述日志系统的Reactor模式类图如下所示:

这里写图片描述

       客户端连接到日志服务器所经过的一系列步骤如下图所示:

这里写图片描述

       日志服务器记录日志所经过的一系列步骤如下图所示:

这里写图片描述

3、Where:什么场景下使用Reactor模式?

         对于高并发系统,常会使用Reactor模式,其代替了常用的多线程处理方式,节省系统的资源,提高系统的吞吐量。下面用比较直观的形式来介绍这种模式的使用场景。 
        以餐厅为例,每一个人就餐就是一个事件,顾客会先看下菜单,然后点餐,处理这些就餐事件需要服务人员。就像一个网络服务会有很多的请求,服务器会收到每个请求,然后指派工作线程去处理一样。 
         在多线程处理方式下: 
         一个人来就餐,一个服务员去服务,然后客人会看菜单,点菜。 服务员将菜单给后厨。 
         二个人来就餐,二个服务员去服务…… 
         五个人来就餐,五个服务员去服务……

         这类似多线程的处理方式,一个事件到来,就会有一个线程为其服务。很显然这种方式在人少的情况下会有很好的用户体验,每个客人都感觉自己享有了最好的服务。如果这家餐厅一直这样同一时间最多来5个客人,这家餐厅是可以很好的服务下去的。

         由于这家店的服务好,吃饭的人多了起来。同一时间会来10个客人,老板很开心,但是只有5个服务员,这样就不能一对一服务了,有些客人就不能马上享有服务员为其服务了。老板为了挣钱,不得不又请了5个服务员。现在又好了,每位顾客都享受最好最快的待遇了。

         越来越多的人对这家餐厅满意,客源又多了,同时来吃饭的人到了20人,老板高兴但又高兴不起来了,再请服务员吧,占地方不说,还要开工钱,再请人就挣不到到钱了。

         怎么办呢?老板想了想,10个服务员对付20个客人也是能对付过来的,服务员勤快点就好了,伺候完一个客人马上伺候另外一个,还是来得及的。综合考虑了一下,老板决定就使用10个服务人员的线程池!

          但是这样又有一个比较严重的缺点:如果正在接受服务员服务的客人点菜很慢,其他的客人可能就要等好长时间了。有些脾气火爆的客人可能就等不了走人了。

         这样,我么那就引入了Reactor模式,那么,Reactor模式是如何处理这个问题呢?

         老板后来发现,客人点菜比较慢,大部服务员都在等着客人点菜,其实干的活不是太多。老板之所以能当老板当然有点不一样的地方,终于发现了一个新的方法,那就是:当客人点菜的时候,服务员就可以去招呼其他客人了,等客人点好了菜,直接招呼一声“服务员”,马上就有个服务员过去服务。在用了这个新方法后,老板进行了一次裁员,只留了一个服务员!这就是用单个线程来做多线程的事。实际的餐馆都是用的Reactor模式在服务。

4、How:如何使用Reactor模式?

          在网络服务和分布式对象中,对于网络中的某一个请求处理,我们比较关注的内容大致为:读取请求( Read request)、 解码请求(Decode request)、处理服务(Process service)、 编码答复(Encode reply)、 发送答复(Send reply)。但是每一步对系统的开销和效率又不尽相同

A、Classic Service Design

          对于传统的服务设计,每一个到来的请求,系统都会分配一个线程去处理,这样看似合乎情理,但是,当系统请求量瞬间暴增时,会直接把系统拖垮。因为在高并发情况下,系统创建的线程数量是有限的。传统系统设计如下图所示:

这里写图片描述

          传统的服务代码实现如下所示:

    class Server implements Runnable {
        public void run() {
            try {
                //创建服务端连接
                ServerSocket ss = new ServerSocket(PORT);
                //不停创建线程处理新的请求
                while (!Thread.interrupted())
                    new Thread(new Handler(ss.accept())).start();
                // or, single-threaded, or a thread pool
            } catch (IOException ex) {
                /* ... */ }
        }

        //处理请求的handler
        static class Handler implements Runnable {
            final Socket socket;

            Handler(Socket s) {
                socket = s;
            }

            public void run() {
                try {
                    byte[] input = new byte[MAX_INPUT];
                    socket.getInputStream().read(input);
                    byte[] output = process(input);
                    socket.getOutputStream().write(output);
                } catch (IOException ex) {
                    /* ... */ }
            }

            private byte[] process(byte[] cmd) {
                /* ... */ }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

          显然,传统的一对一的线程处理无法满足新需求的变化。对此,考虑到了线程池的使用,这样就使得线程可以被复用,大大降低创建线程和销毁线程的时间。然而,线程池并不能很好满足高并发线程的需求,当海量请求到来时,线程池中的工作线程达到饱和状态,这时可能就导致请求被抛弃,无法完成客户端的请求。对此,考虑到将一次完整的请求切分成几个小的任务,每一个小任务都是非阻塞的;对于读写操作,使用NIO对其进行读写;不同的任务将被分配到与想关联的处理器上进行处理,每个处理器都是通过异步回调机制实现。这样就大大提供系统吞吐量,减少响应时间。这就是下面将要介绍的Reactor模式。

B、Basic Reactor Design

          单线程版的Reactor模式如下图所示。对于客户端的所以请求,都又一个专门的线程去进行处理,这个线程无线循环去监听是否又客户的请求到来,一旦收到客户端的请求,就将其分发给响应的处理器进行处理。

这里写图片描述


Reactor 1: Setup

          在Reactor模式中,我们需要进行一些基本设置,首先需要创建一个Selector和一个ServerSocketChannel ,将监听的端口绑定到Channel中,还需要设置Channel为非阻塞,并在Selector上注册自己感兴趣的时事件,可以是连接事件,也可以是读写事件。代码如下所示:

    //定义reactor,其中包括Selector和ServerSocketChannel 
    //将ServerSocketChannel和事件类型绑定到Seletor上,设置  serverSocket为非阻塞
    class Reactor implements Runnable {
        final Selector selector;
        final ServerSocketChannel serverSocket;

        Reactor(int port) throws IOException {
            selector = Selector.open();
            serverSocket = ServerSocketChannel.open();
            serverSocket.socket().bind(new InetSocketAddress(port));
            serverSocket.configureBlocking(false);
            SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
            sk.attach(new Acceptor());
        }
        /*
         * Alternatively, use explicit SPI provider: SelectorProvider p =
         * SelectorProvider.provider(); selector = p.openSelector();
         * serverSocket = p.openServerSocketChannel();
         */
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

Reactor 2: Dispatch Loop

          下面这段代码可以看做是boss线程,它负责接收请求并安排给对应的handle处理。可以看出,只要当前线程不中断就会一直监听,其中selector.select()是阻塞的,一旦又请求到来时,就会从selector中获取到对应的SelectionKey ,然后将其下发给后续处理程序(工作线程)进行处理。

    // class Reactor continued
    //无限循环等待网络请求的到来
    //其中selector.select();会阻塞直到有绑定到selector的请求类型对应的请求到来,一旦收到事件,处理分发到对应的handler,并将这个事件移除
    public void run() { // normally in a new Thread
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                while (it.hasNext())
                    dispatch((SelectionKey)(it.next());
                    selected.clear();
                }
        } catch (IOException ex) {
            /* ... */ }
        }

    }

    void dispatch(SelectionKey k) {
        Runnable r = (Runnable) (k.attachment());
        if (r != null)
            r.run();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

Reactor 3: Acceptor

          Acceptor也是一个线程,在其run方法中,通过判断serverSocket.accept()方法来获取SocketChannel,只要SocketChannel 不为空,则创建一个handler进行相应处理。 

    // class Reactor continued
    class Acceptor implements Runnable { // inner
        public void run() {
            try {
                SocketChannel c = serverSocket.accept();
                if (c != null)
                    new Handler(selector, c);
            } catch (IOException ex) {
                /* ... */ }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

这里写图片描述


Reactor 4: Handler setup

          从下方代码可看出,一个handler就是一个线程,其中的SocketChannel 被设置成非阻塞。默认在Selector上注册了读事件并绑定到SocketChannel 上。

    final class Handler implements Runnable {
        final SocketChannel socket;
        final SelectionKey sk;
        ByteBuffer input = ByteBuffer.allocate(MAXIN);
        ByteBuffer output = ByteBuffer.allocate(MAXOUT);
        static final int READING = 0, SENDING = 1;
        int state = READING;

        Handler(Selector sel, SocketChannel c) throws IOException {
            socket = c;
            c.configureBlocking(false);
            // Optionally try first read now
            sk = socket.register(sel, 0);
            sk.attach(this);
            sk.interestOps(SelectionKey.OP_READ);
            sel.wakeup();
        }

        boolean inputIsComplete() {
            /* ... */ }

        boolean outputIsComplete() {
            /* ... */ }

        void process() {
            /* ... */ }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

Reactor 5: Request handling

          针对不同的请求事件进行处理,代码实现如下所示:

    // class Handler continued
    //具体的请求处理,可能是读事件、写事件等
    public void run() {
        try {
            if (state == READING)
                read();
            else if (state == SENDING)
                send();
        } catch (IOException ex) {
            /* ... */ }
    }

    void read() throws IOException {
        socket.read(input);
        if (inputIsComplete()) {
            process();
            state = SENDING;
            // Normally also do first write now
            sk.interestOps(SelectionKey.OP_WRITE);
        }
    }

    void send() throws IOException {
        socket.write(output);
        if (outputIsComplete())
            sk.cancel();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27


C、Worker Thread Pools for Reactor

          考虑到工作线程的复用,将工作线程设计为线程池。工作线程使用线程池实现如下图所示。 
这里写图片描述

          在handler中使用线程池来处理任务。代码实现如下所示:

    //这里将具体的业务处理线程设置线程池,提供线程复用
    class Handler implements Runnable {
        // uses util.concurrent thread pool
        static PooledExecutor pool = new PooledExecutor(...);
        static final int PROCESSING = 3;

        // ...
        synchronized void read() { // ...
            socket.read(input);
            if (inputIsComplete()) {
                state = PROCESSING;
                //从线程池中指派线程去完成工作
                pool.execute(new Processer());
            }
        }

        synchronized void processAndHandOff() {
            process();
            state = SENDING; // or rebind attachment
            sk.interest(SelectionKey.OP_WRITE);
        }

        class Processer implements Runnable {
            public void run() {
                processAndHandOff();
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

5、总结

          本文简单阐述了Reactor模型,从Reactor模式是什么到Reactor模式的使用,以及使用Reactor模式的好处。我想如果你了解Netty或者vertx,你一定对Reactor模式有所了解。其中,Netty就是基于Reactor模式搭建的,其是一个异步事件驱动的网络应用框架,感兴趣的同学可以去看看。希望本文对你有所帮助。







发布react到服务器 相关内容

没有更多推荐了,返回首页