新建一个react
2018-07-16 23:40:03 zxm342698145 阅读数 1078

这些天一直在研究网络编程源码,发现很多开源项目都用到了reactor网络模式,例如libevent,skynet,muduo等等。现在对reactor模式也有了一定的认识。

Reactor模式是编写高性能网络服务器的必备技术之一,它具有如下的优点:
1)响应快,不必为单个同步时间所阻塞,虽然Reactor本身依然是同步的;
2)编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销;
3)可扩展性,可以方便的通过增加Reactor实例个数来充分利用CPU资源;

4)可复用性,reactor框架本身与具体事件处理逻辑无关,具有很高的复用性;

 

使用Reactor模型,必备的几个组件:事件源、Reactor框架、多路复用机制和事件处理程序,先来看看Reactor模型的整体框架,接下来再对每个组件做逐一说明。

1) 事件源

Linux上是文件描述符,Windows上就是Socket或者Handle了,这里统一称为“句柄集”;程序在指定的句柄上注册关心的事件,比如I/O事件。

2) event demultiplexer——事件多路分发机制

由操作系统提供的I/O多路复用机制,比如select和epoll。程序首先将其关心的句柄(事件源)及其事件注册到event demultiplexer上;

当有事件到达时,event demultiplexer会发出通知“在已经注册的句柄集中,一个或多个句柄的事件已经就绪”;程序收到通知后,就可以在非阻塞的情况下对事件进行处理了。

3) Reactor——反应器

Reactor,是事件管理的接口,内部使用event demultiplexer注册、注销事件;并运行事件循环,当有事件进入“就绪”状态时,调用注册事件的回调函数处理事件。

一个典型的Reactor声明方式如下:

class Reactor  {
public:
    Reactor()  {}
    ~Reactor(){}

    int RegisterHandler(EventHandler* handler, int event);

    int RemoveHandler(EventHandler*);

    void HandleEvents();
private:
    EventDemultiplexer*  m_demultiplexer;
};

4) Event Handler——事件处理程序

事件处理程序提供了一组接口,每个接口对应了一种类型的事件,供Reactor在相应的事件发生时调用,执行相应的事件处理。通常它会绑定一个有效的句柄。

下面是典型的Event Handler类声明方式:

class EventHandler  {
public:
    virtual void HandleRead() = 0;
    virtual void HandleWrite() = 0 ;
    virtual void HandleError() = 0;
   
};

下面是完整的实现:

 

https://github.com/shonm520/my-reactor

由于休陪产假在家,没有办公环境,macbook又不支持linux epoll,家里网络又不给力,费了好大力气下载了vbox,安装Linux虚拟机,整个环境搭建好都到半夜了。macbook写代码调代码还真是没有公司台式机,双显示器方便,第二天也花了很多时间才调通bug,上传至git,记录下。

 

 

 

新建一个react 相关内容

2017-08-16 19:27:00 weixin_33968104 阅读数 10

github

https://github.com/sea-boat/net-reactor

net-reactor

it’s a simple and easy net framework with nio mode written by java

reactor model

reactor

how-to

just simply like:

public class MyHandler implements Handler {

    private static final Logger LOGGER = LoggerFactory.getLogger(MyHandler.class);
    private long readSize;

    /**
     * The logic to deal with the received data.
     *  
     * It means that reactor will trigger this function once the data is received.
     * @throws IOException 
     */
    public void handle(FrontendConnection connection) throws IOException {
        Buffer buff = connection.getReadBuffer();
        readSize = +readSize + buff.position();
        LOGGER.info(connection.getId() + " connection has receive " + readSize);

    }

}
Handler handler = new MyHandler();
ReactorPool reactorPool = new ReactorPool(Runtime.getRuntime().availableProcessors(), handler);
new Acceptor(reactorPool, acceptorName, host, port).start();

adding a connection event or a connection multi-event:

public class RegisterHandler implements ConnectionEventHandler {
    private static final Logger LOGGER = LoggerFactory
            .getLogger(RegisterHandler.class);

    private static int INTERESTED = ConnectionEvents.REGISTE;

    public void event(FrontendConnection connection) {
        if ((event & INTERESTED) != 0) {
            //do something here 
        }
    }

}
Handler handler = new NetHandler();
ConnectionEventHandler connectionEventHandler = new RegisterHandler();
ReactorPool reactorPool = new ReactorPool(Runtime.getRuntime().availableProcessors(), handler);
Acceptor acceptor = new Acceptor(reactorPool, acceptorName, host, port);
acceptor.addConnectionEventHandler(connectionEventHandler);
acceptor.start();
public class ConnectionLogHandler implements ConnectionEventHandler {
    private static final Logger LOGGER = LoggerFactory
            .getLogger(ConnectionLogHandler.class);
    private static int INTERESTED = ConnectionEvents.ACCEPT
            | ConnectionEvents.CLOSE;

    public void event(Connection connection, int event) {
        if ((event & INTERESTED) != 0) {
            if ((event & ConnectionEvents.ACCEPT) != 0)
                LOGGER.info("accept connection,id is " + connection.getId());
            if ((event & ConnectionEvents.CLOSE) != 0)
                LOGGER.info("close connection,id is " + connection.getId());
        }
    }
}

implements the connection

public class XXXConnection extends Connection {

    private String name;

    public XXXConnection(SocketChannel channel, long id, Reactor reactor) {
        super(channel, id, reactor);
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

}
public class XXXConnectionFactory implements ConnectionFactory {

    public XXXConnection createConnection(SocketChannel channel, long id,
            Reactor reactor) {
        return new XXXConnection(channel, id, reactor);
    }

}
Acceptor acceptor = new Acceptor(reactorPool, acceptorName, host,port);
acceptor.setConnectionFactory(new xxxConnectionFactory());

========广告时间========

鄙人的新书《Tomcat内核设计剖析》已经在京东销售了。有须要的朋友能够到 https://item.jd.com/12185360.html 进行预定。

感谢各位朋友。

为什么写《Tomcat内核设计剖析》

=========================

新建一个react 相关内容

2013-05-16 16:59:00 iteye_21202 阅读数 127

与SocketServer TCP客户端不一样的是,这个例子与之前的所有其他客户端看上去都不大一样。它是完全Twisted的。

例16.8 Twisted Reactor Timestamp TCP客户端(tsTclntTW.py)

用Twisted重写我们已经熟悉的时间戳TCP客户端。

1#!/usr/bin/env python
2
3from twisted.internet import protocol, reactor
4
5 HOST=' localhost '
6 PORT=21567
7
8class TSClntProtocol(protocol.Protocol):
9 def sendData(self):
data = raw_input('> ')
11 if data:
12 print '...sending %s...' % data
13 self.transport.write(data)
14 else:
15 self.transport.loseConnection()
16
17 def connectionMade(self):
18 self.sendData()
19
20 def dataReceived(self, data):
21 print data
22 self.sendData()
23
24 class TSClntFactory(protocol.ClientFactory):
25 protocol = TSClntProtocol
26 clientConnectionLost = clientConnectionFailed = \
27 lambda self, connector, reason: reactor.stop()
28
29 reactor.connectTCP(HOST, PORT, TSClntFactory())
30 reactor.run()

逐行解释

1~6行

跟之前所有的客户端程序类似,这里还是导入Twisted的组件。

8~22行

与服务器一样,我们扩展Protocol,重写同样的函数connectionMade()和dataReceived()。这两个函数的用途也跟服务器一样。我们新加一个自己的函数sendData(),用于在需要发送数据时调用。

由于我们现在是客户端,所以我们要主动发起跟服务器的对话。一旦连接建立好之后,我们先发送一个消息,服务器回复这个消息,我们把收到的回复显示在屏幕上,然后再发送其他消息给服务器。

这个过程会一直循环,直到用户没有给任何输入时,连接结束。结束时,就不是调用transport对象的write()函数传数据给服务器了,而是调用loseConnection()函数来关闭套接字。这时,工厂的client ConnectionLost()函数会被调用,同时,reactor就被关闭,脚本的执行就结束了。由于某些原因,clientConnectionFailed()被调用时,reactor也会被关闭。

脚本的最后一部分是创建一个客户端工厂,连接到服务器,然后运行reactor。注意,我们在这里实例化了客户端工厂,而不是像在服务器里那样把它传到reactor中。这是因为,我们不是等待客户端连接的服务器,服务器在有连接时要为每个连接创建一个新的protocol对象。我们只是一个客户端,所以我们只要创建一个protocol对象,连接到服务器,服务器的工厂会创建一个protocol对象来与我们对话。

新建一个react 相关内容

2014-10-23 20:01:36 wsb19871010 阅读数 346

一个简单的ACE Reactor框架的使用

服务端:
#include "iostream"
#include "ace\OS_NS_unistd.h"
#include "ace\INET_Addr.h"
#include "ace\SOCK_Stream.h"
#include "ace\SOCK_Acceptor.h"
#include "ace\Event_Handler.h"
#include "ace\Reactor.h"
#include "ace\Thread_Manager.h"

using namespace std;

class Server : public ACE_Event_Handler
{
public:
	Server(ACE_Reactor *reactor);

	int Open(const char *ip, int port);

	virtual int handle_input(ACE_HANDLE fd = ACE_INVALID_HANDLE);
	virtual int handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask);

protected:
	~Server(){}

private:
	ACE_SOCK_Acceptor acceptor;
	ACE_Thread_Manager threadManager;
};

class ClientHandle : public ACE_Event_Handler
{
public:
	ClientHandle(ACE_HANDLE handle, ACE_Reactor *re);

	virtual int handle_input(ACE_HANDLE fd = ACE_INVALID_HANDLE);
	virtual int handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask);

	ACE_SOCK_Stream &GetSockStream(){ return stream; }

protected:
	~ClientHandle(){}

private:
	ACE_SOCK_Stream stream;
};

ACE_THR_FUNC_RETURN ClientThread(void *param);

int main(int argc, char *argv[])
{
	Server *server = new Server(ACE_Reactor::instance());
	server->Open("192.168.60.65", 9000);

	// 启动消息循环
	ACE_Reactor::instance()->run_event_loop();
	return 0;
}

Server::Server(ACE_Reactor *reactor)
:ACE_Event_Handler(reactor)
{

}

int Server::Open(const char *ip, int port)
{
	ACE_INET_Addr addr(port, ip);
	if (acceptor.open(addr) < 0)
	{
		return -1;
	}

	// 注册接收客户端连接消息
	return reactor()->register_handler(acceptor.get_handle(), this, ACE_Event_Handler::ACCEPT_MASK);
}

int Server::handle_input(ACE_HANDLE fd)
{
	if (fd != acceptor.get_handle())
	{
		return -1;
	}

	ACE_SOCK_Stream stream;
	if (acceptor.accept(stream) < 0)
	{
		return -1;
	}

	ClientHandle *clientHandle = new ClientHandle(stream.get_handle(), reactor());
	threadManager.spawn(ClientThread, clientHandle);

	return 0;
}

int Server::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
{
	acceptor.close();
	delete this;
	return 0;
}

ClientHandle::ClientHandle(ACE_HANDLE handle, ACE_Reactor *re)
{
	stream.set_handle(handle);
	reactor(re);
}

int ClientHandle::handle_input(ACE_HANDLE fd)
{
	if (stream.get_handle() != fd)
	{
		return -1;
	}

	char buf[1024] = { 0 };
	if (stream.recv(buf, 1024) <= 0)
	{
		return -1;
	}
	cout << "Client:" << buf << endl;

	return 0;
}

int ClientHandle::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
{
	stream.send("bye client", strlen("bye client"));
	stream.close();
	delete this;
	return 0;
}

ACE_THR_FUNC_RETURN ClientThread(void *param)
{
	ClientHandle *clientHandle = (ClientHandle *)param;
	clientHandle->reactor()->register_handler(clientHandle->GetSockStream().get_handle(), clientHandle, ACE_Event_Handler::READ_MASK);
	
	ACE_OS::sleep(2);
	clientHandle->GetSockStream().send("hello client", strlen("hello client"));
	ACE_OS::sleep(2);
	clientHandle->reactor()->remove_handler(clientHandle->GetSockStream().get_handle(), ACE_Event_Handler::READ_MASK);
	ACE_OS::sleep(2);

	return 0;
}

客户端:
#include "iostream"
#include "ace\INET_Addr.h"
#include "ace\SOCK_Stream.h"
#include "ace\SOCK_Connector.h"
#include "ace\Reactor.h"

using namespace std;

class Client : public ACE_Event_Handler
{
public:
	Client(ACE_Reactor *reactor);

	virtual int handle_input(ACE_HANDLE fd = ACE_INVALID_HANDLE);
	virtual int handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask);

	int Connect(char *ip, int port);
	int Send(char *buf, int len);

protected:
	~Client(){}

private:
	ACE_SOCK_Stream stream;
	ACE_SOCK_Connector connector;
};

int main(int argv, char *argc[])
{
	Client *client = new Client(ACE_Reactor::instance());
	if (client->Connect("192.168.60.65", 9000) < 0)
	{
		return 0;
	}

	client->Send("hello server", strlen("hello server"));

	// 启动消息循环
	ACE_Reactor::instance()->run_event_loop();
	return 0;
}

Client::Client(ACE_Reactor *reactor)
:ACE_Event_Handler(reactor)
{

}

int Client::Connect(char *ip, int port)
{
	ACE_INET_Addr addr(port, ip);
	// 连接服务
	if (connector.connect(stream, addr) < 0)
	{
		return -1;
	}
	// 注册读事件
	reactor()->register_handler(stream.get_handle(), this, ACE_Event_Handler::READ_MASK);

	return 0;
}

int Client::Send(char *buf, int len)
{
	return stream.send(buf, len);
}

int Client::handle_input(ACE_HANDLE fd)
{
	char buf[1024] = { 0 };
	if (stream.recv(buf, 1024) <= 0)
	{
		return -1;
	}
	cout << "Server:" << buf << endl;

	return 0;
}

int Client::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
{
	stream.close();
	delete this;
	return 0;
}


新建一个react 相关内容

2017-03-30 08:48:54 weixin_33691700 阅读数 11

github

https://github.com/sea-boat/net-reactor

net-reactor

it’s a simple and easy net framework with nio mode written by java

reactor model

reactor

how-to

just simply like:

public class MyHandler implements Handler {

    private static final Logger LOGGER = LoggerFactory.getLogger(MyHandler.class);
    private long readSize;

    /**
     * The logic to deal with the received data.
     *  
     * It means that reactor will trigger this function once the data is received.
     * @throws IOException 
     */
    public void handle(FrontendConnection connection) throws IOException {
        Buffer buff = connection.getReadBuffer();
        readSize = +readSize + buff.position();
        LOGGER.info(connection.getId() + " connection has receive " + readSize);

    }

}
Handler handler = new MyHandler();
ReactorPool reactorPool = new ReactorPool(Runtime.getRuntime().availableProcessors(), handler);
new Acceptor(reactorPool, acceptorName, host, port).start();

adding a connection event or a connection multi-event:

public class RegisterHandler implements ConnectionEventHandler {
    private static final Logger LOGGER = LoggerFactory
            .getLogger(RegisterHandler.class);

    private static int INTERESTED = ConnectionEvents.REGISTE;

    public void event(FrontendConnection connection) {
        if ((event & INTERESTED) != 0) {
            //do something here 
        }
    }

}
Handler handler = new NetHandler();
ConnectionEventHandler connectionEventHandler = new RegisterHandler();
ReactorPool reactorPool = new ReactorPool(Runtime.getRuntime().availableProcessors(), handler);
Acceptor acceptor = new Acceptor(reactorPool, acceptorName, host, port);
acceptor.addConnectionEventHandler(connectionEventHandler);
acceptor.start();
public class ConnectionLogHandler implements ConnectionEventHandler {
    private static final Logger LOGGER = LoggerFactory
            .getLogger(ConnectionLogHandler.class);
    private static int INTERESTED = ConnectionEvents.ACCEPT
            | ConnectionEvents.CLOSE;

    public void event(Connection connection, int event) {
        if ((event & INTERESTED) != 0) {
            if ((event & ConnectionEvents.ACCEPT) != 0)
                LOGGER.info("accept connection,id is " + connection.getId());
            if ((event & ConnectionEvents.CLOSE) != 0)
                LOGGER.info("close connection,id is " + connection.getId());
        }
    }
}

implements the connection

public class XXXConnection extends Connection {

    private String name;

    public XXXConnection(SocketChannel channel, long id, Reactor reactor) {
        super(channel, id, reactor);
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

}
public class XXXConnectionFactory implements ConnectionFactory {

    public XXXConnection createConnection(SocketChannel channel, long id,
            Reactor reactor) {
        return new XXXConnection(channel, id, reactor);
    }

}
Acceptor acceptor = new Acceptor(reactorPool, acceptorName, host,port);
acceptor.setConnectionFactory(new xxxConnectionFactory());

========广告时间========

鄙人的新书《Tomcat内核设计剖析》已经在京东销售了,有需要的朋友可以到 https://item.jd.com/12185360.html 进行预定。感谢各位朋友。

为什么写《Tomcat内核设计剖析》

=========================

新建一个react 相关内容

没有更多推荐了,返回首页