反应器(reactor)模式
2013-02-21 16:53:52 chexlong 阅读数 1336
  1. 通常的,对一个文件描述符指定的文件或设备, 有两种工作方式: 阻塞与非阻塞。所谓阻塞方式的意思是指, 当试图对该文件描述符进行读写时, 如果当时没有东西可读,或者暂时不可写, 程序就进入等待状态, 直到有东西可读或者可写为止。而对于非阻塞状态, 如果没有东西可读, 或者不可写, 读写函数马上返回, 而不会等待。    
  2.   
  3.     在前面的章节中提到的Tcp通信的例子中,就是采用的阻塞式的工作方式:当接收tcp数据时,如果远端没有数据可以读,则会一直阻塞到读到需要的数据为止。这种方式的传输和传统的被动方法的调用类似,非常直观,并且简单有效,但是同样也存在一个效率问题,如果你是开发一个面对着数千个连接的服务器程序,对每一个客户端都采用阻塞的方式通信,如果存在某个非常耗时的读写操作时,其它的客户端通信将无法响应,效率非常低下。    
  4.   
  5.     一种常用做法是:每建立一个Socket连接时,同时创建一个新线程对该Socket进行单独通信(采用阻塞的方式通信)。这种方式具有很高的响应速度,并且控制起来也很简单,在连接数较少的时候非常有效,但是如果对每一个连接都产生一个线程的无疑是对系统资源的一种浪费,如果连接数较多将会出现资源不足的情况。    
  6.   
  7.     另一种较高效的做法是:服务器端保存一个Socket连接列表,然后对这个列表进行轮询,如果发现某个Socket端口上有数据可读时(读就绪),则调用该socket连接的相应读操作;如果发现某个Socket端口上有数据可写时(写就绪),则调用该socket连接的相应写操作;如果某个端口的Socket连接已经中断,则调用相应的析构方法关闭该端口。这样能充分利用服务器资源,效率得到了很大提高。   
  8.   
  9.     
  10.   
  11.      系统I/O方式可分为阻塞,非阻塞同步和非阻塞异步三类,三种方式中,非阻塞异步模式的扩展性和性能最好。主要是讲了两种IO多路复用模式:Reactor和Proactor,并对它们进行了比较。   
  12.   
  13. 两种I/O多路复用模式:Reactor和Proactor   
  14.   
  15.      一般地,I/O多路复用机制都依赖于一个事件多路分离器(Event Demultiplexer)。分离器对象可将来自事件源的I/O事件分离出来,并分发到对应的read/write事件处理器(Event Handler)。开发人员预先注册需要处理的事件及其事件处理器(或回调函数);事件分离器负责将请求事件传递给事件处理器。两个与事件分离器有关的模式是Reactor和Proactor。Reactor模式采用同步IO,而Proactor采用异步IO。   
  16.   
  17.       在Reactor中,事件分离器负责等待文件描述符或socket为读写操作准备就绪,然后将就绪事件传递给对应的处理器,最后由处理器负责完成实际的读写工作。   
  18.   
  19.       而在Proactor模式中,处理器--或者兼任处理器的事件分离器,只负责发起异步读写操作。IO操作本身由操作系统来完成。传递给操作系统的参数需要包括用户定义的数据缓冲区地址和数据大小,操作系统才能从中得到写出操作所需数据,或写入从socket读到的数据。事件分离器捕获IO操作完成事件,然后将事件传递给对应处理器。比如,在windows上,处理器发起一个异步IO操作,再由事件分离器等待IOCompletion事件。典型的异步模式实现,都建立在操作系统支持异步API的基础之上,我们将这种实现称为“系统级”异步或“真”异步,因为应用程序完全依赖操作系统执行真正的IO工作。   
  20.   
  21.  举个例子,将有助于理解Reactor与Proactor二者的差异,以读操作为例(类操作类似)。   
  22.  在Reactor中实现读:   
  23.  - 注册读就绪事件和相应的事件处理器   
  24.  - 事件分离器等待事件   
  25.  - 事件到来,激活分离器,分离器调用事件对应的处理器。   
  26.  - 事件处理器完成实际的读操作,处理读到的数据,注册新的事件,然后返还控制权。   
  27.   
  28.   
  29.  与如下Proactor(真异步)中的读过程比较:   
  30.  - 处理器发起异步读操作(注意:操作系统必须支持异步IO)。在这种情况下,处理器无视IO就绪事件,它关注的是完成事件。   
  31.  - 事件分离器等待操作完成事件   
  32.  - 在分离器等待过程中,操作系统利用并行的内核线程执行实际的读操作,并将结果数据存入用户自定义缓冲区,最后通知事件分离器读操作完成。   
  33.  - 事件分离器呼唤处理器。   
  34.  - 事件处理器处理用户自定义缓冲区中的数据,然后启动一个新的异步操作,并将控制权返回事件分离器。   
  35.      
  36.   
  37.  Java NIO非堵塞应用通常适用用在I/O读写等方面,我们知道,系统运行的性能瓶颈通常在I/O读写,包括对端口和文件的操作上,过去,在打开一个I/O通道后,read()将一直等待在端口一边读取字节内容,如果没有内容进来,read()也是傻傻的等,这会影响我们程序继续做其他事情,那么改进做法就是开设线程,让线程去等待,但是这样做也是相当耗费资源的。   
  38.   
  39. Java NIO非堵塞技术实际是采取Reactor模式,或者说是Observer模式为我们监察I/O端口,如果有内容进来,会自动通知我们,这样,我们就不必开启多个线程死等,从外界看,实现了流畅的I/O读写,不堵塞了。   
  40.   
  41. Java NIO出现不只是一个技术性能的提高,你会发现网络上到处在介绍它,因为它具有里程碑意义,从JDK1.4开始,Java开始提高性能相关的功能,从而使得Java在底层或者并行分布式计算等操作上已经可以和C或Perl等语言并驾齐驱。   
  42.   
  43.   
  44. NIO主要原理和适用。   
  45.   
  46. NIO 有一个主要的类Selector,这个类似一个观察者,只要我们把需要探知的socketchannel告诉Selector,我们接着做别的事情,当有事件发生时,他会通知我们,传回一组SelectionKey,我们读取这些Key,就会获得我们刚刚注册过的socketchannel,然后,我们从这个Channel中读取数据,放心,包准能够读到,接着我们可以处理这些数据。   
  47.   
  48. Selector内部原理实际是在做一个对所注册的channel的轮询访问,不断的轮询(目前就这一个算法),一旦轮询到一个channel有所注册的事情发生,比如数据来了,他就会站起来报告,交出一把钥匙,让我们通过这把钥匙来读取这个channel的内容。

反应器(reactor)模式 相关内容

2016-02-22 22:24:40 iteye_6926 阅读数 18
[b][size=small]概述[/size][/b]
Java NIO非堵塞技术实际是采取反应器模式,或者说是观察者(observer)模式为我们监察I/O端口,如果有内容进来,会自动通知我们,这样,我们就不必开启多个线程死等,从外界看,实现了流畅的I/O读写,不堵塞了。

同步和异步的区别:有无通知(是否轮询)
阻塞与非阻塞的区别:操作结果是否等待(是否马上有返回值),只是设计方式的不同

NIO 有一个主要的类Selector,这个类似一个观察者,只要我们把需要探知的socketchannel告诉Selector,我们接着做别的事情,当有事件发生时,他会通知我们,传回一组SelectionKey,我们读取这些Key,就会获得我们刚刚注册过的socketchannel,然后,我们从这个Channel中读取数据,接着我们可以处理这些数据。

反应器模式与观察者模式在某些方面极为相似:当一个主体发生改变时,所有依属体都得到通知。不过,观察者模式与单个事件源关联,而反应器模式则与多个事件源关联 。

[b][size=small]一般模型[/size][/b]
我们想象以下情形:长途客车在路途上,有人上车有人下车,但是乘客总是希望能够在客车上得到休息。

传统的做法是:每隔一段时间(或每一个站),司机或售票员对每一个乘客询问是否下车。

反应器模式做法是:汽车是乘客访问的主体(Reactor),乘客上车后,到售票员(acceptor)处登记,之后乘客便可以休息睡觉去了,当到达乘客所要到达的目的地后,售票员将其唤醒即可。

原理图:
[img]http://dl2.iteye.com/upload/attachment/0115/2383/0e188d22-e97a-32ef-8422-a43d33896f1e.png[/img]


package com.cbf4life.reactor;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
* @Description:反应器模式,解决多用户并发访问问题
* 举例:餐厅服务
* 传统线程池做法:来一个客人(请求)请求一个服务员(线程)
* 反应器模式做法:当客人点菜的时候,服务员就可以去招呼其他客人了,等客人点好了菜,直接招呼一声“服务员”
* @Author: xiaoyun
* @Company: http://blog.csdn.net
* @CreateDate: 2016-2-20
*/
public class Reactor implements Runnable {

// 初始化选择器
public final Selector selector;
// 初始化通道
public final ServerSocketChannel serverSocketChannel;

public Reactor(int port) throws IOException {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
InetSocketAddress inetSocketAddress = new InetSocketAddress(InetAddress.getLocalHost(), port);
serverSocketChannel.socket().bind(inetSocketAddress);
serverSocketChannel.configureBlocking(false);

// 向选择器中注册该通道
SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 利用selectionKey的attach方法绑定Acceptor,如果有请求过来,那么出发Acceptor
key.attach(new Acceptor(this));
}

@Override
public void run() {
try {
while(!Thread.interrupted()) {
selector.select();
Set<SelectionKey> selectionKeys= selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
// Selector中如果发现channel中有OP_ACCEPT或者READ事件发生,下列遍历就会运行
while(it.hasNext()) {
// 来一个事件,第一次触发一个accepter线程
// 以后触发SockReadHander
SelectionKey key = it.next();
dispatch(key);
}
selectionKeys.clear();
}
} catch (IOException e) {
e.printStackTrace();
}
}

public void dispatch(SelectionKey key) {
Runnable r = (Runnable)key.attachment();
if(r != null) {
r.run();
}
}

}


package com.cbf4life.reactor;

import java.io.IOException;
import java.nio.channels.SocketChannel;

/**
* @Description: 任务处理器
* @Author: xiaoyun
* @Company: http://blog.csdn.net
* @CreateDate: 2016-2-20
*/
public class Acceptor implements Runnable {

private Reactor reactor;

public Acceptor(Reactor reactor) {
this.reactor = reactor;
}

@Override
public void run() {
try {
SocketChannel socketChannel = reactor.serverSocketChannel.accept();
if(socketChannel != null) {
SocketReadHandler handler = new SocketReadHandler(reactor.selector, socketChannel);
}
} catch (IOException e) {
e.printStackTrace();
}

}

}

package com.cbf4life.reactor;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* @Description:执行器
* @Author: xiaoyun
* @Company: http://java.itcast.cn
* @CreateDate: 2016-2-20
*/
public class SocketReadHandler implements Runnable {

private SocketChannel socketChannel;

public SocketReadHandler(Selector selector, SocketChannel socketChannel) throws IOException {
this.socketChannel = socketChannel;
socketChannel.configureBlocking(false);
SelectionKey key = socketChannel.register(selector, 0);

// 将选择器的key值绑定为本handler,下一步有事件触发时,将调用本类的run方法
key.attach(this);
// 将该selectionKey标记为可读,以便读取
key.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}

/**
* 处理读取的数据
*/
@Override
public void run() {
ByteBuffer inputBuffer = ByteBuffer.allocate(1024);
// 设置开始位置为0,结束为止为缓存字节长度
inputBuffer.clear();
try {
socketChannel.read(inputBuffer);
ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.execute(this);
} catch (IOException e) {
e.printStackTrace();
}
}

}




该文章转载自:[url]http://blog.csdn.net/linxcool/article/details/7771952[/url]

反应器(reactor)模式 相关内容

2016-04-04 20:28:43 xuyunti 阅读数 25

概述

Java NIO非堵塞技术实际是采取反应器模式,或者说是观察者(observer)模式为我们监察I/O端口,如果有内容进来,会自动通知我们,这样,我们就不必开启多个线程死等,从外界看,实现了流畅的I/O读写,不堵塞了。

同步和异步区别:有无通知(是否轮询)
堵塞和非堵塞区别:操作结果是否等待(是否马上有返回值),只是设计方式的不同

NIO 有一个主要的类Selector,这个类似一个观察者,只要我们把需要探知的socketchannel告诉Selector,我们接着做别的事情,当有事件发生时,他会通知我们,传回一组SelectionKey,我们读取这些Key,就会获得我们刚刚注册过的socketchannel,然后,我们从这个Channel中读取数据,接着我们可以处理这些数据。

反应器模式与观察者模式在某些方面极为相似:当一个主体发生改变时,所有依属体都得到通知。不过,观察者模式与单个事件源关联,而反应器模式则与多个事件源关联 。

一般模型

我们想象以下情形:长途客车在路途上,有人上车有人下车,但是乘客总是希望能够在客车上得到休息。

传统的做法是:每隔一段时间(或每一个站),司机或售票员对每一个乘客询问是否下车。

反应器模式做法是:汽车是乘客访问的主体(Reactor),乘客上车后,到售票员(acceptor)处登记,之后乘客便可以休息睡觉去了,当到达乘客所要到达的目的地后,售票员将其唤醒即可。

 

代码实现

[java] view plain copy
  1. package com.linxcool.reactor;  
  2.   
  3. import java.io.IOException;  
  4. import java.net.InetAddress;  
  5. import java.net.InetSocketAddress;  
  6. import java.nio.channels.SelectionKey;  
  7. import java.nio.channels.Selector;  
  8. import java.nio.channels.ServerSocketChannel;  
  9. import java.util.Iterator;  
  10. import java.util.Set;  
  11.   
  12. /** 
  13.  * 反应器模式 
  14.  * 用于解决多用户访问并发问题 
  15.  *  
  16.  * 举个例子:餐厅服务问题 
  17.  *  
  18.  * 传统线程池做法:来一个客人(请求)去一个服务员(线程) 
  19.  * 反应器模式做法:当客人点菜的时候,服务员就可以去招呼其他客人了,等客人点好了菜,直接招呼一声“服务员” 
  20.  *  
  21.  * @author linxcool 
  22.  */  
  23. public class Reactor implements Runnable{  
  24.     public final Selector selector;  
  25.     public final ServerSocketChannel serverSocketChannel;  
  26.   
  27.     public Reactor(int port) throws IOException{  
  28.         selector=Selector.open();  
  29.         serverSocketChannel=ServerSocketChannel.open();  
  30.         InetSocketAddress inetSocketAddress=new InetSocketAddress(InetAddress.getLocalHost(),port);  
  31.         serverSocketChannel.socket().bind(inetSocketAddress);  
  32.         serverSocketChannel.configureBlocking(false);  
  33.           
  34.         //向selector注册该channel    
  35.         SelectionKey selectionKey=serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);  
  36.   
  37.         //利用selectionKey的attache功能绑定Acceptor 如果有事情,触发Acceptor   
  38.         selectionKey.attach(new Acceptor(this));  
  39.     }  
  40.   
  41.     @Override  
  42.     public void run() {  
  43.         try {  
  44.             while(!Thread.interrupted()){  
  45.                 selector.select();  
  46.                 Set<SelectionKey> selectionKeys= selector.selectedKeys();  
  47.                 Iterator<SelectionKey> it=selectionKeys.iterator();  
  48.                 //Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。  
  49.                 while(it.hasNext()){  
  50.                     //来一个事件 第一次触发一个accepter线程    
  51.                     //以后触发SocketReadHandler  
  52.                     SelectionKey selectionKey=it.next();  
  53.                     dispatch(selectionKey);  
  54.                     selectionKeys.clear();  
  55.                 }  
  56.             }  
  57.         } catch (IOException e) {  
  58.             e.printStackTrace();  
  59.         }  
  60.     }  
  61.       
  62.     /** 
  63.      * 运行Acceptor或SocketReadHandler 
  64.      * @param key 
  65.      */  
  66.     void dispatch(SelectionKey key) {  
  67.         Runnable r = (Runnable)(key.attachment());    
  68.         if (r != null){    
  69.             r.run();  
  70.         }    
  71.     }    
  72.   
  73. }  
[java] view plain copy
  1. package com.linxcool.reactor;  
  2.   
  3. import java.io.IOException;  
  4. import java.nio.channels.SocketChannel;  
  5.   
  6. public class Acceptor implements Runnable{  
  7.     private Reactor reactor;  
  8.     public Acceptor(Reactor reactor){  
  9.         this.reactor=reactor;  
  10.     }  
  11.     @Override  
  12.     public void run() {  
  13.         try {  
  14.             SocketChannel socketChannel=reactor.serverSocketChannel.accept();  
  15.             if(socketChannel!=null)//调用Handler来处理channel  
  16.                 new SocketReadHandler(reactor.selector, socketChannel);  
  17.         } catch (IOException e) {  
  18.             e.printStackTrace();  
  19.         }  
  20.     }  
  21. }  
[java] view plain copy
  1. package com.linxcool.reactor;  
  2.   
  3. import java.io.IOException;  
  4. import java.nio.ByteBuffer;  
  5. import java.nio.channels.SelectionKey;  
  6. import java.nio.channels.Selector;  
  7. import java.nio.channels.SocketChannel;  
  8.   
  9. public class SocketReadHandler implements Runnable{  
  10.     private SocketChannel socketChannel;  
  11.     public SocketReadHandler(Selector selector,SocketChannel socketChannel) throws IOException{  
  12.         this.socketChannel=socketChannel;  
  13.         socketChannel.configureBlocking(false);  
  14.           
  15.         SelectionKey selectionKey=socketChannel.register(selector, 0);  
  16.           
  17.         //将SelectionKey绑定为本Handler 下一步有事件触发时,将调用本类的run方法。    
  18.         //参看dispatch(SelectionKey key)    
  19.         selectionKey.attach(this);  
  20.           
  21.         //同时将SelectionKey标记为可读,以便读取。    
  22.         selectionKey.interestOps(SelectionKey.OP_READ);    
  23.         selector.wakeup();  
  24.     }  
  25.       
  26.     /** 
  27.      * 处理读取数据 
  28.      */  
  29.     @Override  
  30.     public void run() {  
  31.         ByteBuffer inputBuffer=ByteBuffer.allocate(1024);  
  32.         inputBuffer.clear();  
  33.         try {  
  34.             socketChannel.read(inputBuffer);  
  35.             //激活线程池 处理这些request  
  36.             //requestHandle(new Request(socket,btt));   
  37.         } catch (IOException e) {  
  38.             e.printStackTrace();  
  39.         }  
  40.     }  
  41. }  

 

反应器(reactor)模式 相关内容

2014-11-03 22:30:07 lgl125 阅读数 4760

两周前知道这个模式,现在稍微明白一些了。 写这篇博客的主要是方便以后复习。现在还是学生,相信这个模式在以后工作中会经常使用,因为百度的mentor发邮件说要学的几个设计模式里面就有这个……

反应器(Reactor)模式是为了处理一个或多个客户端同时提交的服务请求而设计的。事件驱动的应用程序可以使用反应器结构化模式,多路分解并分配从一个或多个客户端发给应用程序的服务请求。该模式的别名有:分配器(Dispatcher),通知器(Notifier)

以下例子摘自:http://daimojingdeyu.iteye.com/blog/828696

先用比较直观的方式来介绍一下这种方式的优点,通过和常用的多线程方式比较一下,可能更好理解。

以去饭店吃饭为例,每一伙人来就餐就是一个事件,吃饭的人会先看一下菜单,然后点菜。处理这些就餐事件的就需要我们的服务人员了。每个服务员相当于一个线程。 

多线程处理的方式会是这样的:

一个人来就餐,一个服务员去服务,然后客人会看菜单,点菜。 服务员将菜单给后厨。

二个人来就餐,二个服务员去服务……

五个人来就餐,五个服务员去服务……

 

这个就是多线程的处理方式,一个事件到来,就会有一个线程服务。很显然这种方式在人少的情况下会有很好的用户体验,每个客人都感觉自己是VIP,专人服务的。如果饭店一直这样同一时间最多来5个客人,这家饭店是可以很好的服务下去的。

 

来了一个好消息,因为这家店的服务好,吃饭的人多了起来。同一时间会来10个客人,老板很开心,但是只有5个服务员,这样就不能一对一服务了,有些客人就要没有人管了。老板就又请了5个服务员,现在好了,又能每个人都受VIP待遇了。

 

越来越多的人对这家饭店满意,客源又多了,同时来吃饭的人到了20人,老板高兴不起来了,再请服务员吧,占地方不说,还要开工钱,再请人就攒不到钱了。怎么办呢?老板想了想,10个服务员对付20个客人也是能对付过来的,服务员勤快点就好了,伺候完一个客人马上伺候另外一个,还是来得及的。综合考虑了一下,老板决定就使用10个服务人员的线程池啦~~~

 

但是这样有一个比较严重的缺点就是,如果正在接受服务员服务的客人点菜很慢,其他的客人可能就要等好长时间了。有些火爆脾气的客人可能就等不了走人了。

Reactor就可以很好的解决这个问题。

因为点菜才通常是很耗时的,所以当有人来吃饭的时候,可以先把菜单交给点菜的人自己浏览,等点菜的人想好了要点的菜的时候再招呼服务员,等服务员过来了之后就可以为顾客点菜并发送到后厨了。这个在某种意义上说就是用单线程在做多线程的事情。 

Reactor的事件驱动就体现在了只有当事件发生(客户招呼服务员点菜)的时候,服务员(线程)才去处理。而客户刚进入饭店的时候,是不需要去处理的。

从这个简单的例子应该可以基本明白Reactor是干什么的了吧。(由事件触发,并分发请求)

下面的内容主要来自:《面向模式的软件体系结构:卷2 用于并发和网络化对象的模式》的第三章的第一部分。点此下载点此下载整本书


一下主要针对论文中的几个图进行解释:

图1:反应器模式类图

Reactor有5个主要的成员:

Reactor(反应器):负责响应IO事件,一旦发生,广播发送给相应的Handler去处理。反应器向具体时间处理程序分配对应的钩子方法。具体的事件调用并不需要调用反应器,而是由反应器分配一个具体的事件处理程序。具体的事件处理程序对某个指定的事件的发生做出反应。 思想可参考Ioc

Synchronous Event Demultiplexer(同步事件分离器):这个分离器是一个函数。可以看到类图中的select()方法,就如java.nio.channels.Selector类中的select()方法一样。(后面对提到) 该函数一直在阻塞,直到某个事件的发生。就如服务员一直在select(),直到有个人叫她去点菜。

Handle(描述符):就如java.nio.channels.SelectionKey类。用来识别事件源,看这个时间是要做什么的,是请求连接,还是要读、写……

Event Handler(事件处理接口):定义一个由一个或多个钩子方法组成的接口。

Concrete Event Handler(具体的事件处理接口):无。



图2 :客户机连接到登录服务器时采取的步骤

1 登录服务器首先向反应器注册接受器。

2 服务器调用事件处理方法(handle_event),这个方法是循环的。

3 调用同步事件分离器的select方法。这个方法是在handle_event中执行的。

4 客户端发来连接请求

5 handle_event方法中通知Acceptor 

6 Acceptor接收客户机的请求

7 Acceptor创建一个Handler处理客户端的请求。

8 Handler向反应器注册socket handle,即告诉分发器当这个socket什么时候“准备就绪”一定要notify我。


图3:服务器为一条登录记录服务时要采取的步骤

1 客户端发送登录记录请求

2 阻塞的select()方法捕捉到了一个事件,并有handler_event方法传递给Handler处理。

3 登录处理程序从Socket中非阻塞的读取记录  重复2,3步,直到从socket中获取的记录被完全的接收了。

4 Handler处理记录并执行写方法。

5 Handler方法返回,循环继续执行。

以上的图片和文字来自中文的电子书。 其实书中也是从英文论文中翻译过来的。英语好的直接看原文吧。

  简单的实现这个模式需要三个类来完成,本人也是参见了网上其他人的代码:http://gzcj.iteye.com/blog/307217 程序没有客户端进行测试,希望真心想学习的人自己写一个客户端测试一下。

这个是Reactor的代码,其中LoggingAcceptor作为其内部类。写到了Reactor的内部。

package com.csdn;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.logging.Logger;

/**
 * 以下代码中巧妙使用了SocketChannel的attach功能, 将Hanlder和可能会发生事件的channel链接在一起,当发生事件时,
 * 可以立即触发相应链接的Handler。
 */
public class Reactor implements Runnable {
	final Selector selector;
	final ServerSocketChannel serverSocket;

	Reactor(int port) throws IOException {
		selector = Selector.open(); // 创建选择器
		serverSocket = ServerSocketChannel.open(); // 打开服务器套接字通道
		InetSocketAddress address = new InetSocketAddress(
				InetAddress.getLocalHost(), port);
		serverSocket.socket().bind(address);

		serverSocket.configureBlocking(false); // 调整此通道的阻塞模式。 - 异步

		SelectionKey sk = serverSocket.register(selector, // 向selector注册该channel
				SelectionKey.OP_ACCEPT); // 用于套接字接受操作的操作集位。

		// 利用selectionKey的attache功能绑定Acceptor,Acceptor叫做附加对象 如果有事情,触发Acceptor
		//接收客户机的请求
		sk.attach(new LoggingAcceptor()); // 将给定的对象附加到此键。
	}

	//这个相当于handle_event()方法
	public void run() { // normally in a new Thread
		try {
			while (!Thread.interrupted()) {
				selector.select();
				Set selected = selector.selectedKeys();
				Iterator it = selected.iterator();
				// Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。
				while (it.hasNext())
					// 来一个事件 第一次触发一个accepter线程
					// 以后触发SocketReadHandler
					dispatch((SelectionKey) (it.next()));
				selected.clear();
			}
		} catch (IOException ex) {
		}
	}

	void dispatch(SelectionKey k) {
		Runnable r = (Runnable) (k.attachment());
		if (r != null) {
			//handle_event方法中通知Acceptor或者Handler 
			r.run();
		}
	}

	class LoggingAcceptor implements Runnable { // inner
		public void run() {
			try {
				SocketChannel c = serverSocket.accept();
				if (c != null)
					//Acceptor创建一个Handler处理客户端的请求
					new LoggingHandler(selector, c); // 调用Handler来处理channel
			} catch (IOException ex) {
			}
		}
	}
}

下面是Handler的代码,没有写成一个接口+继承的样式。

package com.csdn;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

public class LoggingHandler implements Runnable {

	// private Test test=new Test();

	final SocketChannel socket;
	final SelectionKey sk;

	static final int READING = 0, SENDING = 1;
	int state = READING;

	public LoggingHandler(Selector sel, SocketChannel c) throws IOException {

		socket = c;

		socket.configureBlocking(false);
		//Handler向反应器注册socket handle,即告诉分发器当这个socket什么时候“准备就绪”一定要notify我
		sk = socket.register(sel, 0);

		// 将SelectionKey绑定为本Handler 下一步有事件触发时,将调用本类的run方法。
		// 参看dispatch(SelectionKey k)
		sk.attach(this);

		// 同时将SelectionKey标记为可读,以便读取。
		sk.interestOps(SelectionKey.OP_READ);
		sel.wakeup();
	}

	public void run() {
		try {
			// test.read(socket,input);
			readRequest();
		} catch (Exception ex) {
		}
	}

	/**
	 * 处理读取data
	 * 
	 * @param key
	 * @throws Exception
	 */
	private void readRequest() throws Exception {

		ByteBuffer input = ByteBuffer.allocate(1024);
		input.clear();
		try {
			int bytesRead = socket.read(input);
			// 激活线程池 处理这些request
			// requestHandle(new Request(socket,btt));
		} catch (Exception e) {
		}
	}
}

java的nio实际上就是采用的Reactor模式。 本来打算今天把这个也写出来 没时间了只能下次了。

有几篇博客还是不错的:

http://www.jdon.com/concurrent/reactor.htm

http://blog.csdn.net/vking_wang/article/details/14166493(强烈推荐)

http://www.jdon.com/concurrent/reactor.htm

scalable_nio.pdf 英文原文课件

好多年没自己写东西了,写的匆忙,忘各位大牛不吝赐教。


反应器(reactor)模式 相关内容

2015-01-09 18:45:48 yueqian_zhu 阅读数 448

概述

Java NIO非堵塞技术实际是采取反应器模式,或者说是观察者(observer)模式为我们监察I/O端口,如果有内容进来,会自动通知我们,这样,我们就不必开启多个线程死等,从外界看,实现了流畅的I/O读写,不堵塞了。

同步和异步区别:有无通知(是否轮询)
堵塞和非读者区别:操作结果是否等待(是否马上又返回值),只是设计方式的不同

NIO 有一个主要的类Selector,这个类似一个观察者,只要我们把需要探知的socketchannel告诉Selector,我们接着做别的事情,当有事件发生时,他会通知我们,传回一组SelectionKey,我们读取这些Key,就会获得我们刚刚注册过的socketchannel,然后,我们从这个Channel中读取数据,接着我们可以处理这些数据。

反应器模式与观察者模式在某些方面极为相似:当一个主体发生改变时,所有依属体都得到通知。不过,观察者模式与单个事件源关联,而反应器模式则与多个事件源关联 。

一般模型

我们想象以下情形:长途客车在路途上,有人上车有人下车,但是乘客总是希望能够在客车上得到休息。

传统的做法是:每隔一段时间(或每一个站),司机或售票员对每一个乘客询问是否下车。

反应器模式做法是:汽车是乘客访问的主体(Reactor),乘客上车后,到售票员(acceptor)处登记,之后乘客便可以休息睡觉去了,当到达乘客所要到达的目的地后,售票员将其唤醒即可。


例:

package com.linxcool.reactor;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * 反应器模式
 * 用于解决多用户访问并发问题
 * 
 * 举个例子:餐厅服务问题
 * 
 * 传统线程池做法:来一个客人(请求)去一个服务员(线程)
 * 反应器模式做法:当客人点菜的时候,服务员就可以去招呼其他客人了,等客人点好了菜,直接招呼一声“服务员”
 * 
 * @author linxcool
 */
public class Reactor implements Runnable{
	public final Selector selector;
	public final ServerSocketChannel serverSocketChannel;

	public Reactor(int port) throws IOException{
		selector=Selector.open();
		serverSocketChannel=ServerSocketChannel.open();
		InetSocketAddress inetSocketAddress=new InetSocketAddress(InetAddress.getLocalHost(),port);
		serverSocketChannel.socket().bind(inetSocketAddress);
		serverSocketChannel.configureBlocking(false);
		
		//向selector注册该channel  
		SelectionKey selectionKey=serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

		//利用selectionKey的attache功能绑定Acceptor 如果有事情,触发Acceptor 
		selectionKey.attach(new Acceptor(this));
	}

	@Override
	public void run() {
		try {
			while(!Thread.interrupted()){
				selector.select();
				Set<SelectionKey> selectionKeys= selector.selectedKeys();
				Iterator<SelectionKey> it=selectionKeys.iterator();
				//Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。
				while(it.hasNext()){
					//来一个事件 第一次触发一个accepter线程  
					//以后触发SocketReadHandler
					SelectionKey selectionKey=it.next();
					dispatch(selectionKey);
					selectionKeys.clear();
				}
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	/**
	 * 运行Acceptor或SocketReadHandler
	 * @param key
	 */
	void dispatch(SelectionKey key) {
		Runnable r = (Runnable)(key.attachment());  
		if (r != null){  
			r.run();
		}  
	}  

}


 

package com.linxcool.reactor;

import java.io.IOException;
import java.nio.channels.SocketChannel;

public class Acceptor implements Runnable{
	private Reactor reactor;
	public Acceptor(Reactor reactor){
		this.reactor=reactor;
	}
	@Override
	public void run() {
		try {
			SocketChannel socketChannel=reactor.serverSocketChannel.accept();
			if(socketChannel!=null)//调用Handler来处理channel
				new SocketReadHandler(reactor.selector, socketChannel);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
}


 

package com.linxcool.reactor;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

public class SocketReadHandler implements Runnable{
	private SocketChannel socketChannel;
	public SocketReadHandler(Selector selector,SocketChannel socketChannel) throws IOException{
		this.socketChannel=socketChannel;
		socketChannel.configureBlocking(false);
		
		SelectionKey selectionKey=socketChannel.register(selector, 0);
		
		//将SelectionKey绑定为本Handler 下一步有事件触发时,将调用本类的run方法。  
		//参看dispatch(SelectionKey key)  
		selectionKey.attach(this);
		
		//同时将SelectionKey标记为可读,以便读取。  
		selectionKey.interestOps(SelectionKey.OP_READ);  
		selector.wakeup();
	}
	
	/**
	 * 处理读取数据
	 */
	@Override
	public void run() {
		ByteBuffer inputBuffer=ByteBuffer.allocate(1024);
		inputBuffer.clear();
		try {
			socketChannel.read(inputBuffer);
			//激活线程池 处理这些request
			//requestHandle(new Request(socket,btt)); 
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
}


 

 

 

转自:http://blog.csdn.net/linxcool/article/details/7771952

反应器(reactor)模式 相关内容

反应器模式(Reactor)

阅读数 1116

Reactor反应器模式

阅读数 344

反应器(Reactor)模式

阅读数 64345

没有更多推荐了,返回首页