angularjs转react
2016-03-14 10:39:58 tcxianggg 阅读数 195

线程状态转换图

 

就是非阻塞IO 采用多路分发方式
举个例子吧,你服务器做一个聊天室,按照以前的阻塞式IO,你必须为每个连接创建一个线程 因为当你调用如 in.read(buf)时,线程会阻塞在这里。而采用nio,只要注册了事件,它内部采用反应模式,当有IO事件发生时,再调度它,而不用等待在那里.

当前分布式计算 Web Services盛行天下,这些网络服务的底层都离不开对socket的操作。他们都有一个共同的结构:
1. Read request
2. Decode request
3. Process service
4. Encode reply
5. Send reply

经典的网络服务的设计如下图,在每个线程中完成对数据的处理:

但这种模式在用户负载增加时,性能将下降非常的快。我们需要重新寻找一个新的方案,保持数据处理的流畅,很显然,事件触发机制是最好的解决办法,当有事件发生时,会触动handler,然后开始数据的处理。

Reactor模式类似于AWT中的Event处理:

Reactor模式参与者

1.Reactor 负责响应IO事件,一旦发生,广播发送给相应的Handler去处理,这类似于AWT的thread
2.Handler 是负责非堵塞行为,类似于AWT ActionListeners;同时负责将handlers与event事件绑定,类似于AWT addActionListener

如图:

Java的NIO为reactor模式提供了实现的基础机制,它的Selector当发现某个channel有数据时,会通过SlectorKey来告知我们,在此我们实现事件和handler的绑定。

我们来看看Reactor模式代码:

 

public class Reactor implements Runnable{
  final Selector selector;
  final ServerSocketChannel serverSocket;
  Reactor(int port) throws IOException {
    selector = Selector.open();
    serverSocket = ServerSocketChannel.open();
    InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(),port);
    serverSocket.socket().bind(address);

    serverSocket.configureBlocking(false);
    //向selector注册该channel
     SelectionKey sk =serverSocket.register(selector,SelectionKey.OP_ACCEPT);

    logger.debug("-->Start serverSocket.register!");

    //利用sk的attache功能绑定Acceptor 如果有事情,触发Acceptor
    sk.attach(new Acceptor());
    logger.debug("-->attach(new Acceptor()!");
  }

  public void run() { // normally in a new Thread
    try {
    while (!Thread.interrupted())
    {
      selector.select();
      Set selected = selector.selectedKeys();
      Iterator it = selected.iterator();
      //Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。
      while (it.hasNext())
        //来一个事件 第一次触发一个accepter线程
        //以后触发SocketReadHandler
        dispatch((SelectionKey)(it.next()));
        selected.clear();
      }
    }catch (IOException ex) {
        logger.debug("reactor stop!"+ex);
    }
  }
  //运行Acceptor或SocketReadHandler
  void dispatch(SelectionKey k) {
    Runnable r = (Runnable)(k.attachment());
    if (r != null){
      // r.run();
    }
  }
  class Acceptor implements Runnable { // inner
    public void run() {
    try {
      logger.debug("-->ready for accept!");
      SocketChannel c = serverSocket.accept();
      if (c != null)
        //调用Handler来处理channel
        new SocketReadHandler(selector, c);
      }
    catch(IOException ex) {
      logger.debug("accept stop!"+ex);
    }
    }
  }
}
以上代码中巧妙使用了SocketChannel的attach功能,将Hanlder和可能会发生事件的channel链接在一起,当发生事件时,可以立即触发相应链接的Handler。
再看看Handler代码:
 
public class SocketReadHandler implements Runnable {
  public static Logger logger = Logger.getLogger(SocketReadHandler.class);
  private Test test=new Test();
  final SocketChannel socket;
  final SelectionKey sk;

   static final int READING = 0, SENDING = 1;
  int state = READING;
  public SocketReadHandler(Selector sel, SocketChannel c)
    throws IOException {
    socket = c;
    socket.configureBlocking(false);
     sk = socket.register(sel, 0);
    //将SelectionKey绑定为本Handler 下一步有事件触发时,将调用本类的run方法。
    //参看dispatch(SelectionKey k)
    sk.attach(this);

    //同时将SelectionKey标记为可读,以便读取。
    sk.interestOps(SelectionKey.OP_READ);
    sel.wakeup();
  }
  public void run() {
    try{
    // test.read(socket,input);
      readRequest() ;
    }catch(Exception ex){
    logger.debug("readRequest error"+ex);
    }
  }

/**
* 处理读取data
* @param key
* @throws Exception
*/
private void readRequest() throws Exception {
  ByteBuffer input = ByteBuffer.allocate(1024);
  input.clear();
  try{
    int bytesRead = socket.read(input);
    ......
    //激活线程池 处理这些request
    requestHandle(new Request(socket,btt));
    .....

  }catch(Exception e) {
  }

}

 



注意在Handler里面又执行了一次attach,这样,覆盖前面的Acceptor,下次该Handler又有READ事件发生时,将直接触发Handler.从而开始了数据的读 处理 写 发出等流程处理。

将数据读出后,可以将这些数据处理线程做成一个线程池,这样,数据读出后,立即扔到线程池中,这样加速处理速度:

更进一步,我们可以使用多个Selector分别处理连接和读事件。

一个高性能的Java网络服务机制就要形成,激动人心的集群并行计算即将实现。

http://www.jdon.com/concurrent/reactor.htm

 

--------------

Java NIO非堵塞应用通常适用用在I/O读写等方面,我们知道,系统运行的性能瓶颈通常在I/O读写,包括对端口和文件的操作上,过去,在打开一个I/O通道后,read()将一直等待在端口一边读取字节内容,如果没有内容进来,read()也是傻傻的等,这会影响我们程序继续做其他事情,那么改进做法就是开设线程,让线程去等待,但是这样做也是相当耗费资源的。

Java NIO非堵塞技术实际是采取Reactor模式,或者说是Observer模式为我们监察I/O端口,如果有内容进来,会自动通知我们,这样,我们就不必开启多个线程死等,从外界看,实现了流畅的I/O读写,不堵塞了。

Java NIO出现不只是一个技术性能的提高,你会发现网络上到处在介绍它,因为它具有里程碑意义,从JDK1.4开始,Java开始提高性能相关的功能,从而使得Java在底层或者并行分布式计算等操作上已经可以和C或Perl等语言并驾齐驱。

如果你至今还是在怀疑Java的性能,说明你的思想和观念已经完全落伍了,Java一两年就应该用新的名词来定义。从JDK1.5开始又要提供关于线程、并发等新性能的支持,Java应用在游戏等适时领域方面的机会已经成熟,Java在稳定自己中间件地位后,开始蚕食传统C的领域。

本文主要简单介绍NIO的基本原理,在下一篇文章中,将结合Reactor模式和著名线程大师Doug Lea的一篇文章深入讨论。

NIO主要原理和适用。

NIO 有一个主要的类Selector,这个类似一个观察者,只要我们把需要探知的socketchannel告诉Selector,我们接着做别的事情,当有事件发生时,他会通知我们,传回一组SelectionKey,我们读取这些Key,就会获得我们刚刚注册过的socketchannel,然后,我们从这个Channel中读取数据,放心,包准能够读到,接着我们可以处理这些数据。

Selector内部原理实际是在做一个对所注册的channel的轮询访问,不断的轮询(目前就这一个算法),一旦轮询到一个channel有所注册的事情发生,比如数据来了,他就会站起来报告,交出一把钥匙,让我们通过这把钥匙来读取这个channel的内容。

了解了这个基本原理,我们结合代码看看使用,在使用上,也在分两个方向,一个是线程处理,一个是用非线程,后者比较简单,看下面代码:

 

import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.channels.spi.*;
import java.net.*;
import java.util.*;
/**
*
* @author Administrator
* @version
*/
public class NBTest {

  /** Creates new NBTest */
  public NBTest()
  {
  }
  public void startServer() throws Exception
  {
  int channels = 0;
  int nKeys = 0;
  int currentSelector = 0;

  //使用Selector
  Selector selector = Selector.open();

  //建立Channel 并绑定到9000端口
  ServerSocketChannel ssc = ServerSocketChannel.open();
  InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(),9000); 
  ssc.socket().bind(address);

  //使设定non-blocking的方式。
  ssc.configureBlocking(false);
  //向Selector注册Channel及我们有兴趣的事件
  SelectionKey s = ssc.register(selector, SelectionKey.OP_ACCEPT);
  printKeyInfo(s);
  while(true) //不断的轮询
  {
    debug("NBTest: Starting select");

    //Selector通过select方法通知我们我们感兴趣的事件发生了。
    nKeys = selector.select();
    //如果有我们注册的事情发生了,它的传回值就会大于0
    if(nKeys > 0)
    {
      debug("NBTest: Number of keys after select operation: " +nKeys);

      //Selector传回一组SelectionKeys
      //我们从这些key中的channel()方法中取得我们刚刚注册的channel。
      Set selectedKeys = selector.selectedKeys();
      Iterator i = selectedKeys.iterator();
      while(i.hasNext())
      {
         s = (SelectionKey) i.next();
         printKeyInfo(s);
         debug("NBTest: Nr Keys in selector: " +selector.keys().size());

         //一个key被处理完成后,就都被从就绪关键字(ready keys)列表中除去
         i.remove();
         if(s.isAcceptable())
         {
           // 从channel()中取得我们刚刚注册的channel。
           Socket socket = ((ServerSocketChannel)s.channel()).accept().socket();
           SocketChannel sc = socket.getChannel();

           sc.configureBlocking(false);
           sc.register(selector, SelectionKey.OP_READ |SelectionKey.OP_WRITE);
                      System.out.println(++channels);
         }
         else
         {
           debug("NBTest: Channel not acceptable");
         }
      }
   }
   else
   {
      debug("NBTest: Select finished without any keys.");
   }
  }

}

private static void debug(String s)
{
  System.out.println(s);
}

private static void printKeyInfo(SelectionKey sk)
{
  String s = new String();
  s = "Att: " + (sk.attachment() == null ? "no" : "yes");
  s += ", Read: " + sk.isReadable();
  s += ", Acpt: " + sk.isAcceptable();
  s += ", Cnct: " + sk.isConnectable();
  s += ", Wrt: " + sk.isWritable();
  s += ", Valid: " + sk.isValid();
  s += ", Ops: " + sk.interestOps();
  debug(s);
}

/**
* @param args the command line arguments
*/
public static void main (String args[])
{
  NBTest nbTest = new NBTest();
  try
  {
    nbTest.startServer();
  }
    catch(Exception e)
  {
    e.printStackTrace();
  }
}
}

 



 

这是一个守候在端口9000的noblock server例子,如果我们编制一个客户端程序,就可以对它进行互动操作,或者使用telnet 主机名 90000 可以链接上。

通过仔细阅读这个例程,相信你已经大致了解NIO的原理和使用方法,下一篇,我们将使用多线程来处理这些数据,再搭建一个自己的Reactor模式。

angularjs转react 相关内容

2014-08-20 08:57:23 sony21 阅读数 18

在高性能的I/O设计中,有两个比较著名的模式Reactor和Proactor模式,其中Reactor模式用于同步I/O,而Proactor运用于异步I/O操作。  
同步和异步  
同步和异步是针对应用程序和内核的交互而言的,同步指的是用户进程触发IO操作并等待或者轮询的去查看IO操作是否就绪,而异步是指用户进程触发IO操作以后便开始做自己的事情,而当IO操作已经完成的时候会得到IO完成的通知。  
阻塞和非阻塞  
阻塞方式下读取或者写入函数将一直等待,而非阻塞方式下,读取或者写入函数会立即返回一个状态值。  

I/O模型可以分为:同步阻塞,同步非阻塞,异步阻塞,异步非阻塞IO  
1.同步阻塞  
用户进程在发起一个IO操作以后,必须等待IO操作的完成,只有当真正完成了IO操作以后,用户进程才能运行。JAVA传统的IO模型属于此种方式!  

2.同步非阻塞  
用户进程发起一个IO操作以后边可返回做其它事情,但是用户进程需要时不时的询问IO操作是否就绪,这就要求用户进程不停的去询问,从而引入不必要的CPU资源浪费。其中目前JAVA的NIO就属于同步非阻塞IO。  

3.异步阻塞  
此种方式下是指应用发起一个IO操作以后,不等待内核IO操作的完成,等内核完成IO操作以后会通知应用程序,这其实就是同步和异步最关键的区别,同步必须等待或者主动的去询问IO是否完成,那么为什么说是阻塞的呢?因为此时是通过select系统调用来完成的,而select函数本身的实现方式是阻塞的,而采用select函数有个好处就是它可以同时监听多个文件句柄,从而提高系统的并发性!  

4.异步非阻塞  
用户进程只需要发起一个IO操作然后立即返回,等IO操作真正的完成以后,应用程序会得到IO操作完成的通知,此时用户进程只需要对数据进行处理就好了,不需要进行实际的IO读写操作,因为真正的IO读取或者写入操作已经由内核完成了。目前Java中还没有支持此种IO模型。  

综上所述,同步和异步是相对于应用和内核的交互方式而言的,同步 需要主动去询问,而异步的时候内核在IO事件发生的时候通知应用程序,而阻塞和非阻塞仅仅是系统在调用系统调用的时候函数的实现方式而已。  

Reactor模式  
很多地方叫反应器模式,并发系统常使用reactor模式,代替常用的多线程的处理方式,节省系统的资源,提高系统的吞吐量。  
例如:在高并发的情况下  
多线程的处理:为每个单独到来的请求,专门启动一条线程,这样的话造成系统的开销很大,并且在单核的机上,多线程并不能提高系统的性能,除非在有一些阻塞的情况发生。否则线程切换的开销会使处理的速度变慢。  
Reactor模式的处理:服务器端启动一条单线程,用于轮询IO操作是否就绪,当有就绪的才进行相应的读写操作,这样的话就减少了服务器产生大量的线程,也不会出现线程之间的切换产生的性能消耗。(目前JAVA的NIO就采用的此种模式,这里引申出一个问题:在多核情况下NIO的扩展问题)  

以上两种处理方式都是基于同步的,多线程的处理是我们传统模式下对高并发的处理方式,Reactor模式的处理是现今面对高并发和高性能一种主流的处理方式。  

Proactor模式  
运用于异步I/O操作,Proactor模式中,应用程序不需要进行实际的读写过程,它只需要从缓存区读取或者写入即可,操作系统会读取缓存区或者写入缓存区到真正的IO设备. 

angularjs转react 相关内容

2016-12-14 14:07:00 weixin_34379433 阅读数 30

原文地址:https://www.infoq.com/articles/reactor-by-example

Key takeaways

  • Reactor is a reactive streams library targeting Java 8 and providing an Rx-conforming API
  • It uses the same approach and philosophy as RxJava despite some API differences
  • It is a 4th generation reactive library that allows operator fusion, like RxJava 2
  • Reactor is a core dependency in the reactive programming model support of Spring Framework 5.

RxJava recap

Reactor, like RxJava 2, is a fourth generation reactive library. It has been launched by Spring custodian Pivotal, and builds on the Reactive Streams specification, Java 8, and the ReactiveX vocabulary. Its design is the result of a savant mix fueled by designs and core contributors from Reactor 2 (the previous major version) and RxJava.

In previous articles in this series, "RxJava by Example" and "Testing RxJava", you learned about the basics of reactive programming: how data is conceptualized as a stream, the Observable class and its various operators, the factory methods that create Observables from static and dynamic sources.

Observable is the push source and Observer is the simple interface for consuming this source via the act of subscribing. Keep in mind that the contract of an Observable is to notify its Observer of 0 or more data items through onNext, optionally followed by either an onError or onComplete terminating event.

To test an Observable, RxJava provides aTestSubscriber, which is a special flavor of Observer that allows you to assert events in your stream.

 

In this article we'll draw a parallel between Reactor and what you already learned about RxJava, and showcase the common elements as well as the differences.

Reactor's types

Reactor's two main types are the Flux<T> and Mono<T>. A Flux is the equivalent of an RxJavaObservable, capable of emitting 0 or more items, and then optionally either completing or erroring.

A Mono on the other hand can emit at most once. It corresponds to both Single and Maybetypes on the RxJava side. Thus an asynchronous task that just wants to signal completion can use a Mono<Void>.

This simple distinction between two types makes things easy to grasp while providing meaningful semantics in a reactive API: by just looking at the returned reactive type, one can know if a method is more of a "fire-and-forget" or "request-response" (Mono) kind of thing or is really dealing with multiple data items as a stream (Flux).

Both Flux and Mono make use of this semantic by coercing to the relevant type when using some operators. For instance, calling single() on a Flux<T> will return a Mono<T>, whereas concatenating two monos together using concatWith will produce a Flux. Similarly, some operators will make no sense on a Mono (for example take(n), which produces n > 1 results), whereas other operators will only make sense on a Mono (e.g. or(otherMono)).

One aspect of the Reactor design philosophy is to keep the API lean, and this separation into two reactive types is a good middle ground between expressiveness and API surface.

"Build on Rx, with Reactive Streams at every stage"

As expressed in "RxJava by Example", RxJava bears some superficial resemblance to Java 8 Streams API, in terms of concepts. Reactor on the other hand looks a lot like RxJava, but this is of course in no way a coincidence. The intention is to provide a Reactive Streams native library that exposes an Rx-conforming operator API for asynchronous logic composition. So while Reactor is rooted in Reactive Streams, it seeks general API alignment with RxJava where possible.

Reactive Libraries and Reactive Streams adoption

Reactive Streams (abbreviated RS in the remainder of this article) is "an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure". It is a set of textual specifications along with a TCK and four simple interfaces (PublisherSubscriber,Subscription and Processor), which will be integrated in Java 9.

It mainly deals with the concept of reactive-pull back-pressure (more on that later) and how to interoperate between several implementing reactive sources. It doesn't cover operators at all, focusing instead exclusively on the stream's lifecycle.

A key differentiator for Reactor is its RS first approach.Both Flux and Mono are RS Publisherimplementations and conform to reactive-pull back-pressure.

In RxJava 1 only a subset of operators support back-pressure, and even though RxJava 1 has adapters to RS types, its Observable doesn't implement these types directly. That is easily explained by the fact that RxJava 1 predates the RS specification and served as one of the foundational works during the specification's design.

That means that each time you use these adapters you are left with a Publisher, which again doesn't have any operator. In order to do anything useful from there, you'll probably want to go back to an Observable, which means using yet another adapter. This visual clutter can be detrimental to readability, especially when an entire framework like Spring 5 directly builds on top of Publisher.

Another difference with RxJava 1 to keep in mind when migrating to Reactor or RxJava 2 is that in the RS specification, null values are not authorized. It might turn out important if your code base uses null to signal some special cases.

RxJava 2 was developed after the Reactive Streams specification, and thus has a direct implementation of Publisher in its new Flowable type. But instead of focusing exclusively on RS types, RxJava 2 also keeps the "legacy" RxJava 1 types (ObservableCompletable, and Single)  and introduces the "RxJava Optional", Maybe. Although they still provide the semantic differentiation we talked about earlier, these types have the drawback of not implementing RS interfaces. Note that unlike in RxJava 1, Observable in RxJava 2 does not support the backpressure protocol in RxJava 2 (a feature now exclusively reserved to Flowable). It has been kept for the purpose of providing a rich and fluent API for cases, such as user interface eventing, where backpressure is impractical or impossible. CompletableSingle and Maybe have by design no-need for backpressure support, they will offer a rich API as well and defer any workload until subscribed.

Reactor is once again leaner in this area, sporting its Mono and Flux types, both implementingPublisher and both backpressure-ready. There's a relatively small overhead for Mono to behave as a Publisher, but it is mostly offsetted by other Mono optimizations. We'll see in a later section what backpressure means for Mono.

An API similar but not equal to RxJava's

The ReactiveX and RxJava vocabulary of operators can be overwhelming at times, and some operators can have confusing names for historical reasons. Reactor aims to have a more compact API and to deviate in some cases, e.g. in order to choose better names, but overall the two APIs look a lot alike. In fact the latest iterations in RxJava 2 actually borrow some vocabulary from Reactor as well, a hint of the ongoing close collaboration between the two projects. Some operators and concepts first appear in one library or the other, but often end up in both.

For instance, Flux has the same familiar just factory method (albeit having only two justvariants: one element and a vararg). But from, has been replaced by several explicit variants, most notable being fromIterable. Flux also has all the usual suspects in term of operators: map,mergeconcatflatMaptake…, etc.

One example of an RxJava operator name that Reactor eschewed was the puzzling amboperator, which has been replaced with the more appropriately named firstEmitting. Additionally, to introduce greater consistency in the API, toList has been renamed collectList. In fact all collectXXX operators now aggregate values into a specific type of collection but still produce a Mono of said collection, while toXXX methods are reserved for type conversions that take you out of the reactive world, eg. toFuture().

One more mean by which Reactor can be leaner, this time in terms of class instantiation and resource usage, is fusion: Reactor is capable of merging multiple sequential uses of certain operators (eg. calling concatWith twice) into a single use, only instantiating the operator's inner classes once (macro-fusion). That includes some data source based optimization which greatly helps Mono offset the cost of implementing Publisher. It is also capable of sharing resources like inner queues between several compatible operators (micro-fusion). These capabilities make Reactor a fourth-generation reactive library. But that is a topic for a future article.

Let's take a closer look at a few Reactor operators. (You will notice the contrast with some of the examples in the earlier articles in our series.)

A few operator examples

(This section contains snippets of code, and we encourage you to try them and experiment further with Reactor. To that effect, you should open your IDE of choice and create a test project with Reactor as a dependency.)

To do so in Maven, add the following to the dependencies section of your pom.xml:

<dependency>
    <groupId>io.projectreactor</groupId>	
    <artifactId>reactor-core</artifactId>
    <version>3.0.3.RELEASE</version>
</dependency>

To do the same in Gradle, edit the dependencies section to add reactor, similarly to this:

dependencies {
    compile "io.projectreactor:reactor-core:3.0.3.RELEASE"
}

Let's play with examples used in the previous articles in this series!

Very similarly to how you would create your first Observable in RxJava, you can create a Fluxusing the just(T…) and fromIterable(Iterable<T>) Reactor factory methods. Remember that given a Listjust would just emit the list as one whole, single emission, while fromIterable will emit each element from the iterable list:

public class ReactorSnippets {
  private static List<String> words = Arrays.asList(
        "the",
        "quick",
        "brown",
        "fox",
        "jumped",
        "over",
        "the",
        "lazy",
        "dog"
        );

  @Test
  public void simpleCreation() {
     Flux<String> fewWords = Flux.just("Hello", "World");
     Flux<String> manyWords = Flux.fromIterable(words);

     fewWords.subscribe(System.out::println);
     System.out.println();
     manyWords.subscribe(System.out::println);
  }
}

Like in the corresponding RxJava examples, this prints
Hello
World

the
quick
brown
fox
jumped
over
the
lazy
dog

In order to output the individual letters in the fox sentence we'll also need flatMap (as we did in RxJava by Example), but in Reactor we use fromArray instead of from. We then want to filter out duplicate letters and sort them using distinct and sort. Finally, we want to output an index for each distinct letter, which can be done using zipWith and range:

@Test
public void findingMissingLetter() {
  Flux<String> manyLetters = Flux
        .fromIterable(words)
        .flatMap(word -> Flux.fromArray(word.split("")))
        .distinct()
        .sort()
        .zipWith(Flux.range(1, Integer.MAX_VALUE),
              (string, count) -> String.format("%2d. %s", count, string));

  manyLetters.subscribe(System.out::println);
}

This helps us notice the s is missing as expected:

1. a
2. b
...
18. r
19. t
20. u
...
25. z

One way of fixing that is to correct the original words array, but we could also manually add the "s" value to the Flux of letters using concat/concatWith and a Mono:

@Test
public void restoringMissingLetter() {
  Mono<String> missing = Mono.just("s");
  Flux<String> allLetters = Flux
        .fromIterable(words)
        .flatMap(word -> Flux.fromArray(word.split("")))
        .concatWith(missing)
        .distinct()
        .sort()
        .zipWith(Flux.range(1, Integer.MAX_VALUE),
              (string, count) -> String.format("%2d. %s", count, string));

  allLetters.subscribe(System.out::println);
}

This adds the missing s just before we filter out duplicates and sort/count the letters:

1. a
2. b
...
18. r
19. s
20. t
...
26. z

The previous article noted the resemblance between the Rx vocabulary and the Streams API, and in fact when the data is readily available from memory, Reactor, like Java Streams, acts in simple push mode (see the backpressure section below to understand why). More complex and truly asynchronous snippets wouldn't work with this pattern of just subscribing in the main thread, primarily because control would return to the main thread and then exit the application as soon as the subscription is done. For instance:

@Test
public void shortCircuit() {
  Flux<String> helloPauseWorld = 
              Mono.just("Hello")
                  .concatWith(Mono.just("world")
                  .delaySubscriptionMillis(500));

  helloPauseWorld.subscribe(System.out::println);
}

This snippet prints "Hello", but fails to print the delayed "world" because the test terminates too early. In snippets and tests where you only sort of write a main class like this, you'll usually want to revert back to blocking behavior. To do that you could create a CountDownLatch and callcountDown in your subscriber (both in onError and onComplete). But then that's not very reactive, is it? (and what if you forget to count down, in case of error for instance?)

The second way you could solve that issue is by using one of the operators that revert back to the non-reactive world. Specifically, toIterable and toStream will both produce a blocking instance. So let's use toStream for our example:

@Test
public void blocks() {
  Flux<String> helloPauseWorld = 
    Mono.just("Hello")
        .concatWith(Mono.just("world")
                        .delaySubscriptionMillis(500));

  helloPauseWorld.toStream()
                 .forEach(System.out::println);
}

As you would expect, this prints "Hello" followed by a short pause, then prints "world" and terminates.

As we mentioned above, RxJava amb() operator has been renamed firstEmitting (which more clearly hints at the operator's purpose: selecting the first Flux to emit). In the following example, we create a Mono whose start is delayed by 450ms and a Flux that emits its values with a 400ms pause before each value. When firstEmitting() them together, since the first value from theFlux comes in before the Mono's value, it is the Flux that ends up being played:

@Test
public void firstEmitting() {
  Mono<String> a = Mono.just("oops I'm late")
                       .delaySubscriptionMillis(450);
  Flux<String> b = Flux.just("let's get", "the party", "started")
                       .delayMillis(400);

  Flux.firstEmitting(a, b)
      .toIterable()
      .forEach(System.out::println);
}

This prints each part of the sentence with a short 400ms pause between each section.

At this point you might wonder, what if you're writing a test for a Flux that introduces delays of 4000ms instead of 400? You don't want to wait 4s in a unit test! Fortunately, we'll see in a later section that Reactor comes with powerful testing facilities that nicely cover this case.

But for now, we have sampled how Reactor compares for a few common operators, so let's zoom back and have a look at other differentiating aspects of the library.

A Java 8 foundation

Reactor targets Java 8 rather than previous Java versions. This is once again aligning with the goal of reducing the API surface: RxJava targets Java 6 where there is no java.util.functionpackage so classes like Function or Consumer can't be leveraged. Instead they had to add specific classes like Func1Func2Action0Action1, etc. In RxJava 2 these classes mirrorjava.util.function the way Reactor 2 used to do when it still had to support Java 7.

The Reactor API also embraces types introduced in Java 8. Most of the time-related operators will be about a duration (eg. timeoutintervaldelay, etc.), so using the Java 8 Duration classis appropriate.

The Java 8 Stream API and CompletableFuture can also both be easily converted to a Flux/Mono, and vice-versa. Should we usually convert a Stream to a Flux though? Not really. The level of indirection added by Flux or Mono is a negligible cost when they decorate more costly operations like IO or memory-bound operations, but most of the time a Stream doesn't imply that kind of latency and it is is perfectly ok to use the Stream API directly. Note that for these use cases in RxJava 2 we'd use the Observable, as it is not backpressured and thus becomes a simple pushuse case once you've subscribed. But Reactor is based on Java 8, and the Stream API is expressive enough for most use cases. Note also that even though you can find Flux and Monofactories for literal or simple Objects, they mostly serve the purpose of being combined in higher level flows. So typically you wouldn't want to transform an accessor like "long getCount()" into a "Mono<Long> getCount()" when migrating an existing codebase to reactive patterns.

The Backpressure story

One of the main focuses (if not the main focus) of the RS specification and of Reactor itself isbackpressure. The idea of backpressure is that in a push scenario where the producer is quicker than the consumer, there's value in letting the consumer signal back to the producer and say "Hey! Slow down a little, I'm overwhelmed". This gives the producer a chance to control its pace rather than having to resort to discarding data (sampling) or worse, risking a cascading failure.

You may wonder at this point where backpressure comes into the picture with Mono: what kind of consumer could possibly be overwhelmed by a single emission? Short answer is "probably none". However, there's still a key difference between how a Mono works and how aCompletableFuture works. The latter is push only: if you have a reference to the Future, it means the task processing an asynchronous result is already executing. On the other hand, what a backpressured Flux or Mono enables is a deferred pull-push interaction:

  1. Deferred because nothing happens before the call to subscribe()
  2. Pull because at the subscription and request steps, the Subscriber will send a signal upstream to the source and essentially pull the next chunk of data
  3. Push from producer to consumer from there on, within the boundary of the number of requested elements

For Monosubscribe() is the button that you press to say "I'm ready to receive my data". For Flux, this button is request(n), which is kind of a generalization of the former.

Realizing that Mono is a Publisher that will usually represent a costly task (in terms of IO, latency, etc.) is critical to understanding the value of backpressure here: if you don't subscribe, you don't pay the cost of that task. Since Mono will often be orchestrated in a reactive chain with regular backpressured Flux, possibly combining results from multiple asynchronous sources, the availability of this on-demand subscribe triggering is key in order to avoid blocking.

Having backpressure helps us differentiate that last use case from another Mono broad use case: asynchronously aggregating data from a Flux into a Mono. Operators like reduce and hasElementare capable of consuming each item in the Flux, aggregating some form of data about it (respectively the result of a reduce function and a boolean) and exposing that data as a Mono. In that case, the backpressure signalled upstream is Long.MAX_VALUE, which lets the upstream work in a fully push fashion.

Another interesting aspect of backpressure is how it naturally limits the amount of objects held in memory by the stream. As a Publisher, the source of data is most probably slow (at least slowish) at producing items, so the request from downstream can very well start beyond the number of readily available items. In this case, the whole stream naturally falls into a push pattern where new items are notified to the consumer. But when there is a production peak and the pace of production accelerates, things fall nicely back into a pull model. In both cases, at most N data (the request() amount) is kept in memory.

You can reason about the memory used by your asynchronous processing by correlating that demand for N with the number of kilobytes an item consumes, W: you can then infer that at mostW*N memory will be consumed. In fact, Reactor will most of the time take advantage of knowing Nto apply optimizations: creating queues bounded accordingly and applying prefetching strategies where it can automatically request 75% of N every time that same ¾ amount has been received.

Finally, Reactor operators will sometimes change the backpressure signal to correlate it with the expectations and semantics they represent. One prime example of this behavior would bebuffer(10): for every request of N from downstream, that operator would request 10N from upstream, which represents enough data to fill the number of buffers the subscriber is ready to consume. This is called "active backpressure", and it can be put to good use by developers in order to explicitly tell Reactor how to switch from an input volume to a different output volume, in micro-batching scenarios for instance.

Relation to Spring

Reactor is the reactive foundation for the whole Spring ecosystem, and most notably Spring 5 (through Spring Web Reactive) and Spring Data "Kay" (which corresponds to spring-data-commons 2.0).

Having a reactive version for both of these projects is essential, in the sense that this enables us to write a web application that is reactive from start to finish: a request comes in, is asynchronously processed all the way down to and including the database, and results come back asynchronously as well. This allows a Spring application to be very efficient with resources, avoiding the usual pattern of dedicating a thread to a request and blocking it for I/O.

So Reactor is going to be used for the internal reactive plumbing of future Spring applications, as well as in the APIs these various Spring components expose. More generally, they'll be able to deal with RS Publishers, but most of the time these will happen to be Flux/Mono, bringing in the rich feature set of Reactor. Of course, you will be able to use your reactive library of choice, as the framework provides hooks for  adapting between Reactor types and RxJava types or even simpler RS types.

At the time of writing of this article, you can already experiment with Spring Web Reactive in Spring Boot by using Spring Boot 2.0.0.BUILD-SNAPSHOT and the spring-boot-starter-web-reactive dependency (eg. by generating such a project on start.spring.io):

<dependency>
  <groupId>org.springframework.boot.experimental</groupId>
  <artifactId>spring-boot-starter-web-reactive</artifactId>
</dependency>

This lets you write your @Controller mostly as usual, but replaces the underlying Spring MVC traditional layer with a reactive one, replacing many of the Spring MVC contracts by reactive non-blocking ones. By default, this reactive layer is based on top of Tomcat 8.5, but you can also elect to use Undertow or Netty.

Additionally, although Spring APIs are based on Reactor types, the Spring Web Reactive module lets you use various reactive types for both the request and response:

  • Mono<T>: as the @RequestBody, the request entity T is asynchronously deserialized and you can chain your processing to the resulting mono afterward. As the return type, once the Monoemits a value, the T is serialized asynchronously and sent back to the client. You can combine both approaches by augmenting the request Mono and returning that augmented chain as the resulting Mono.
  • Flux<T>: Used in streaming scenarios (including input streaming when used as @RequestBodyand Server Sent Events with a Flux<ServerSentEvent> return type)
  • Single/Observable: Same as Mono and Flux respectively, but switching to an RxJava implementation.
  • Mono<Void> as a return type: Request handling completes when the Mono completes.
  • Non-reactive return types (void and T): This now implies that your controller method is synchronous, but should be non-blocking (short-lived processing). The request handling finishes once the method is executed. The returned T is serialized back to the client asynchronously.

Here is a quick example of a plain text @Controller using the experimental web reactive module:

@RestController
public class ExampleController {

   private final MyReactiveLibrary reactiveLibrary;

   //Note Spring Boot 4.3+ autowires single constructors now
   public ExampleController(MyReactiveLibrary reactiveLibrary) {
      this.reactiveLibrary = reactiveLibrary;
   }

   @GetMapping("hello/{who}")
   public Mono<String> hello(@PathVariable String who) {
      return Mono.just(who)
                 .map(w -> "Hello " + w + "!");
   }

   @GetMapping("helloDelay/{who}")
   public Mono<String> helloDelay(@PathVariable String who) {
      return reactiveLibrary.withDelay("Hello " + who + "!!", 2);
   }

   @PostMapping("heyMister")
   public Flux<String> hey(@RequestBody Mono<Sir> body) {
      return Mono.just("Hey mister ")
            .concatWith(body
                .flatMap(sir -> Flux.fromArray(sir.getLastName().split("")))
                .map(String::toUpperCase)
                .take(1)
            ).concatWith(Mono.just(". how are you?"));
   }
}

The first endpoint takes a path variable, transforms it into a Mono<String> and maps that name to a greeting sentence that is returned to the client.

By doing a GET on /hello/Simon we get "Hello Simon!" as a text/plain response.

The second endpoint is a bit more complicated: it asynchronously receives a serialized Sirinstance (a class simply made up of a firstName and lastName attributes) and flatMaps it into a stream of the last name's letters. It then takes the first of these letters, maps it to upper case andconcatenates it into a greeting sentence.

So POSTing the following JSON object to /heyMister

{
	"firstName": "Paul",
	"lastName": "tEsT"
}

Returns the string "Hello mister T. How are you?".

The reactive aspect of Spring Data is also currently being developed in the Kay release train, which for spring-data-commons is the 2.0.x branch. There is a first Milestone out that you can get by adding the Spring Data Kay-M1 bom to your pom:

<dependencyManagement>
  <dependencies>
     <dependency>
        <groupId>org.springframework.data</groupId>
        <artifactId>spring-data-releasetrain</artifactId>
        <version>Kay-M1</version>
        <scope>import</scope>
        <type>pom</type>
     </dependency>
  </dependencies>
</dependencyManagement>

Then for this simplistic example just add the Spring Data Commons dependency in your pom (it will take the version from the BOM above):

<dependency>
  <groupId>org.springframework.data</groupId>
  <artifactId>spring-data-commons</artifactId>
</dependency>

Reactive support in Spring Data revolves around the new ReactiveCrudRepository<T, ID>interface, which extends Repository<T, ID>. This interface exposes CRUD methods, using Reactor input and return types. There is also an RxJava 1 based version calledRxJava1CrudRepository. For instance, in the classical blocking CrudRepository, retrieving one entity by its id would be done using "T findOne(ID id)". It becomes "Mono<T> findOne(ID id)" and "Observable<T> findOne(ID id)" in ReactiveCrudRepository and RxJava1CrudRepositoryrespectively. There are even variants that take a Mono/Single as argument, to asynchronously provide the key and compose on that.

Assuming a reactive backing store (or a mock ReactiveCrudRepository bean), the following (very naive) controller would be reactive from start to finish:

@RestController
public class DataExampleController {

   private final ReactiveCrudRepository<Sir, String> reactiveRepository;

   //Note Spring Boot 4.3+ autowires single constructors now
   public DataExampleController(ReactiveCrudRepository<Sir, String> repo) {
      this.reactiveRepository = repo;
   }

   @GetMapping("data/{who}")
   public Mono<ResponseEntity<Sir>> hello(@PathVariable String who) {
      return reactiveRepository.findOne(who)
                   .map(ResponseEntity::ok)
                   .defaultIfEmpty(ResponseEntity.status(404).body(null));
   }
}

Notice how the data repository usage naturally flows into the response path: we asynchronously fetch the entity and wrap it as a ResponseEntity using map, obtaining a Mono we can return right away. If the Spring Data repository cannot find data for this key, it will return an empty Mono. We make that explicit by using defaultIfEmpty and returning a 404.

Testing Reactor

The article "Testing RxJava" covered techniques for testing an Observable. As we saw, RxJava comes with a TestScheduler that you can use with operators that accept a Scheduler as a parameter, to manipulate a virtual clock on these operators. It also features a TestSubscriberclass that can be leveraged to wait for the completion of an Observable and to make assertions about every event (number and values for onNext, has onError triggered, etc.) In RxJava 2, theTestSubscriber is an RS Subscriber, so you can test Reactor's Flux and Mono with it!

In Reactor, these two broad features are combined into the StepVerifier class. It can be found in the addon module reactor-test from the reactor-addons repository. The StepVerifier can be initialized by creating an instance from any Publisher, using the StepVerifier.create builder. If you want to use virtual time, you can use the StepVerifier.withVirtualTime builder, which takes a Supplier<Publisher>. The reason for this is that it will first ensure that aVirtualTimeScheduler is created and enabled as the default Scheduler implementation to use, making the need to explicitly pass the scheduler to operators obsolete. The StepVerifier will then configure if necessary the Flux/Mono created within the Supplier, turning timed operators into "virtually timed operator". You can then script stream expectations and time progress: what the next elements should be, should there be an error, should it move forward in time, etc. Other methods include verifying that data matches a given Predicate or even consume onNext events, allowing you to do more advanced interactions with the value (like using an assertion library). Any AssertionError thrown by one of these will be reflected back in the final verification result. Finally, call verify() to check your expectations, this will truly subscribe to the defined source via StepVerifier.create or StepVerifier.withVirtualTime.

Let's take a few simple examples and demonstrate how StepVerifier works. For these snippets, you'll want to add the following test dependencies to your pom:

<dependency>
  <groupId>io.projectreactor.addons</groupId>
  <artifactId>reactor-test</artifactId>
  <version>3.0.3.RELEASE</version>
  <scope>test</scope>
</dependency>

<dependency>
  <groupId>org.assertj</groupId>
  <artifactId>assertj-core</artifactId>
  <version>3.5.2</version>
  <scope>test</scope>
</dependency>

First, imagine you have reactive class called MyReactiveLibrary that produces a few Flux that you want to test:

@Component
public class MyReactiveLibrary {

  public Flux<String> alphabet5(char from) {
     return Flux.range((int) from, 5)
           .map(i -> "" + (char) i.intValue());
  }

  public Mono<String> withDelay(String value, int delaySeconds) {
     return Mono.just(value)
                .delaySubscription(Duration.ofSeconds(delaySeconds));
  }
}

The first method is intended to return the 5 letters of the alphabet following (and including) the given starting letter. The second method returns a flux that emits a given value after a given delay, in seconds.

The first test we'd like to write ensures that calling alphabet5 from x limits the output to x, y, z. With StepVerifier it would go like this:

@Test
public void testAlphabet5LimitsToZ() {
  MyReactiveLibrary library = new MyReactiveLibrary();
  StepVerifier.create(library.alphabet5('x'))
        .expectNext("x", "y", "z")
        .expectComplete()
        .verify();
}

The second test we'd like to run on alphabet5 is that every returned value is an alphabetical character. For that we'd like to use a rich assertion library like AssertJ:

@Test
public void testAlphabet5LastItemIsAlphabeticalChar() {
  MyReactiveLibrary library = new MyReactiveLibrary();
  StepVerifier.create(library.alphabet5('x'))
              .consumeNextWith(c -> assertThat(c)
                    .as("first is alphabetic").matches("[a-z]"))
              .consumeNextWith(c -> assertThat(c)
                    .as("second is alphabetic").matches("[a-z]"))
              .consumeNextWith(c -> assertThat(c)
                    .as("third is alphabetic").matches("[a-z]"))
              .consumeNextWith(c -> assertThat(c)
                    .as("fourth is alphabetic").matches("[a-z]"))
              .expectComplete()
              .verify();
}

Turns out both of these tests fail :(. Let's have a look at the output the StepVerifier gives us in each case to see if we can spot the bug:

java.lang.AssertionError: expected: onComplete(); actual: onNext({)

and

java.lang.AssertionError: [fourth is alphabetic] 
Expecting:
 "{"
to match pattern:
 "[a-z]"

So it looks like our method doesn't stop at z but continues emitting characters from the ASCII range. We could fix that by adding a .take(Math.min(5, 'z' - from + 1)) for instance, or using the same Math.min as the second argument to range.

The last test we want to make involves virtual time manipulation: we'll test the delaying method but without actually waiting for the given amount of seconds, by using the withVirtualTimebuilder:

@Test
public void testWithDelay() {
  MyReactiveLibrary library = new MyReactiveLibrary();
  Duration testDuration =
     StepVerifier.withVirtualTime(() -> library.withDelay("foo", 30))
                 .expectSubscription()
                 .thenAwait(Duration.ofSeconds(10))
                 .expectNoEvent(Duration.ofSeconds(10))
                 .thenAwait(Duration.ofSeconds(10))
                 .expectNext("foo")
                 .expectComplete()
                 .verify();
  System.out.println(testDuration.toMillis() + "ms");
}

This tests a flux that would be delayed by 30 seconds for the following scenario: an immediate subscription, followed by 3x10s where nothing happens, then an onNext("foo") and completion.

The System.out output prints the actual duration the verification took, which in my latest run was 8ms :)

Note that when using the create builder instead, the thenAwait and expectNoEvent methods would still be available but would actually block for the provided duration.

StepVerifier comes with many more methods for describing expectations and asserting state of a Publisher (and if you think about new ones, contributions and feedback are always welcome in the github repository).

Custom Hot Source

Note that the concept of hot and cold observables discussed at the end of "RxJava by Example" also applies to Reactor.

If you want to create a custom Flux, instead of the RxJava AsyncEmitter class, you'd use Reactor's FluxSink. This will cover all the asynchronous corner cases for you and let you focus on emitting your values.

Use Flux.create and get a FluxSink in the callback that you can use to emit data via next. This custom Flux can be cold, so in order to make it hot you can use publish() and connect(). Building on the example from the previous article with a feed of price ticks, we get an almost verbatim translation in Reactor:

SomeFeed<PriceTick> feed = new SomeFeed<>();
Flux<PriceTick> flux =
     Flux.create(emitter ->
     {
        SomeListener listener = new SomeListener() {
           @Override
           public void priceTick(PriceTick event) {
              emitter.next(event);
              if (event.isLast()) {
                 emitter.complete();
              }
           }

           @Override
           public void error(Throwable e) {
              emitter.error(e);
           }};
        feed.register(listener);
     }, FluxSink.OverflowStrategy.BUFFER);

ConnectableFlux<PriceTick> hot = flux.publish();

Before connecting to the hot Flux, why not subscribe twice?  One subscription will print the detail of each tick while the other will only print the instrument:

hot.subscribe(priceTick -> System.out.printf("%s %4s %6.2f%n", priceTick
     .getDate(), priceTick.getInstrument(), priceTick.getPrice()));

hot.subscribe(priceTick -> System.out.println(priceTick.getInstrument()));

We then connect to the hot flux and let it run for 5 seconds before our test snippet terminates:

hot.connect();
Thread.sleep(5000);

(note that in the example repository, the feed would also terminate on its own if the isLast()method of PriceTick is changed).

FluxSink also lets you check if downstream has cancelled its subscription via isCancelled(). You can also get feedback on the outstanding requested amount viarequestedFromDownstream(), which is useful if you want to simply comply with backpressure. Finally, you can make sure any specific resources your source uses are released uponCancellation via setCancellation.

Note that there's a backpressure implication of using FluxSink: you must provide anOverflowStrategy explicitly to let the operator deal with backpressure. This is equivalent to usingonBackpressureXXX operators (eg. FluxSink.OverflowStrategy.BUFFER is equivalent to using.onBackpressureBuffer()), which kind of overrides any backpressure instructions from downstream.

Conclusion

In this article, you have learned about Reactor, a fourth-generation reactive library that builds on the Rx language but targets Java 8 and the Reactive Streams specification. We've shown how the concepts you might have learned in RxJava also apply to Reactor, despite a few API differences. We've also shown how Reactor serves as the foundation for Spring 5, and that it offers resources for testing a Publisher/Flux/Mono.

If you want to dig deeper into using Reactor, the snippets presented in this article are available in our github repository. There is also a workshop, the "Lite Rx API hands-on", that covers more operators and use cases.

Finally, you can reach the Reactor team on Gitter and provide feedback there or through github issues (and of course, pull-requests are welcomed as well).

 

angularjs转react 相关内容

2013-10-25 17:13:23 li_jian_xing 阅读数 442


原文地址:

http://xmuzyq.javaeye.com/blog/783218

=====================================================

在高性能的I/O设计中,有两个比较著名的模式Reactor和Proactor模式,其中Reactor模式用于同步I/O,而Proactor运用于异步I/O操作。

       在比较这两个模式之前,我们首先的搞明白几个概念,什么是阻塞和非阻塞,什么是同步和异步,同步和异步是针对应用程序和内核的交互而言的,同步指的是用户进程触发IO操作并等待或者轮询的去查看IO操作是否就绪,而异步是指用户进程触发IO操作以后便开始做自己的事情,而当IO操作已经完成的时候会得到IO完成的通知。而阻塞和非阻塞是针对于进程在访问数据的时候,根据IO操作的就绪状态来采取的不同方式,说白了是一种读取或者写入操作函数的实现方式,阻塞方式下读取或者写入函数将一直等待,而非阻塞方式下,读取或者写入函数会立即返回一个状态值。

  一般来说I/O模型可以分为:同步阻塞,同步非阻塞,异步阻塞,异步非阻塞IO

   同步阻塞IO:

   在此种方式下,用户进程在发起一个IO操作以后,必须等待IO操作的完成,只有当真正完成了IO操作以后,用户进程才能运行。JAVA传统的IO模型属于此种方式!

   同步非阻塞IO:

在此种方式下,用户进程发起一个IO操作以后边可返回做其它事情,但是用户进程需要时不时的询问IO操作是否就绪,这就要求用户进程不停的去询问,从而引入不必要的CPU资源浪费。其中目前JAVA的NIO就属于同步非阻塞IO。

   异步阻塞IO:

   此种方式下是指应用发起一个IO操作以后,不等待内核IO操作的完成,等内核完成IO操作以后会通知应用程序,这其实就是同步和异步最关键的区别,同步必须等待或者主动的去询问IO是否完成,那么为什么说是阻塞的呢?因为此时是通过select系统调用来完成的,而select函数本身的实现方式是阻塞的,而采用select函数有个好处就是它可以同时监听多个文件句柄,从而提高系统的并发性!

   异步非阻塞IO:

   在此种模式下,用户进程只需要发起一个IO操作然后立即返回,等IO操作真正的完成以后,应用程序会得到IO操作完成的通知,此时用户进程只需要对数据进行处理就好了,不需要进行实际的IO读写操作,因为真正的IO读取或者写入操作已经由内核完成了。目前Java中还没有支持此种IO模型。   

         搞清楚了以上概念以后,我们再回过头来看看,Reactor模式和Proactor模式。

首先来看看Reactor模式,Reactor模式应用于同步I/O的场景。我们分别以读操作和写操作为例来看看Reactor中的具体步骤:

读取操作:

1. 应用程序注册读就需事件和相关联的事件处理器

2. 事件分离器等待事件的发生

3. 当发生读就需事件的时候,事件分离器调用第一步注册的事件处理器

4. 事件处理器首先执行实际的读取操作,然后根据读取到的内容进行进一步的处理

写入操作类似于读取操作,只不过第一步注册的是写就绪事件。

下面我们来看看Proactor模式中读取操作和写入操作的过程:

读取操作:

1. 应用程序初始化一个异步读取操作,然后注册相应的事件处理器,此时事件处理器不关注读取就绪事件,而是关注读取完成事件,这是区别于Reactor的关键。

2. 事件分离器等待读取操作完成事件

3. 在事件分离器等待读取操作完成的时候,操作系统调用内核线程完成读取操作,并将读取的内容放入用户传递过来的缓存区中。这也是区别于Reactor的一点,Proactor中,应用程序需要传递缓存区。

4. 事件分离器捕获到读取完成事件后,激活应用程序注册的事件处理器,事件处理器直接从缓存区读取数据,而不需要进行实际的读取操作。

Proactor中写入操作和读取操作,只不过感兴趣的事件是写入完成事件。

从上面可以看出,Reactor和Proactor模式的主要区别就是真正的读取和写入操作是有谁来完成的,Reactor中需要应用程序自己读取或者写入数据,而Proactor模式中,应用程序不需要进行实际的读写过程,它只需要从缓存区读取或者写入即可,操作系统会读取缓存区或者写入缓存区到真正的IO设备.

          综上所述,同步和异步是相对于应用和内核的交互方式而言的,同步 需要主动去询问,而异步的时候内核在IO事件发生的时候通知应用程序,而阻塞和非阻塞仅仅是系统在调用系统调用的时候函数的实现方式而已。


angularjs转react 相关内容

2015-01-03 22:03:17 iteye_19069 阅读数 14

Java NIO非堵塞技术实际是采取反应器模式,或者说是观察者(observer)模式为我们监察I/O端口,如果有内容进来,会自动通知我们,这样,我们就不必开启多个线程死等,从外界看,实现了流畅的I/O读写,不堵塞了。

同步和异步区别     : 有无通知(是否轮询)
堵塞和非堵塞区别  : 操作结果是否等待(是否马上又返回值),只是设计方式的不同。

NIO 有一个主要的类Selector,这个类似一个观察者,只要我们把需要探知的SocketChannel告诉Selector,我们接着做别的事情,当有事件发生时,他会通知我们,传回一组SelectionKey,我们读取这些Key,就会获得我们刚刚注册过的SocketChannel,然后,我们从这个Channel中读取数据,接着我们可以处理这些数据。

反应器模式与观察者模式在某些方面极为相似:当一个主体发生改变时,所有依属体都得到通知。不过,观察者模式与单个事件源关联,而反应器模式则与多个事件源关联 。


一般模型

我们想象以下情形:长途客车在路途上,有人上车有人下车,但是乘客总是希望能够在客车上得到休息。

传统的做法是:每隔一段时间(或每一个站),司机或售票员对每一个乘客询问是否下车。

反应器模式做法是:汽车是乘客访问的主体(Reactor),乘客上车后,到售票员(acceptor)处登记,之后乘客便可以休息睡觉去了,当到达乘客所要到达的目的地后,售票员将其唤醒即可。

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * 反应器模式 用于解决多用户访问并发问题
 * 
 * 举个例子:餐厅服务问题
 * 
 * 传统线程池做法:来一个客人(请求)去一个服务员(线程)
 * 反应器模式做法:当客人点菜的时候,服务员就可以去招呼其他客人了,等客人点好了菜,直接招呼一声:服务员
 * 
 * @author linxcool
 */
public class Reactor implements Runnable {
    public final Selector selector;
    public final ServerSocketChannel serverSocketChannel;

    public Reactor(int port) throws IOException {
        selector = Selector.open();
        serverSocketChannel = ServerSocketChannel.open();
        InetSocketAddress inetSocketAddress = new InetSocketAddress(
                InetAddress.getLocalHost(), port);
        serverSocketChannel.socket().bind(inetSocketAddress);
        serverSocketChannel.configureBlocking(false);

        // 向selector注册该channel
        SelectionKey selectionKey = serverSocketChannel.register(selector,
                SelectionKey.OP_ACCEPT);

        // 利用selectionKey的attache功能绑定Acceptor 如果有事情,触发Acceptor
        selectionKey.attach(new Acceptor(this));
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectionKeys.iterator();
                // Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。
                while (it.hasNext()) {
                    // 来一个事件 第一次触发一个accepter线程
                    // 以后触发SocketReadHandler
                    SelectionKey selectionKey = it.next();
                    dispatch(selectionKey);
                    selectionKeys.clear();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 运行Acceptor或SocketReadHandler
     * 
     * @param key
     */
    void dispatch(SelectionKey key) {
        Runnable r = (Runnable) (key.attachment());
        if (r != null) {
            r.run();
        }
    }

}

  

 

import java.io.IOException;
import java.nio.channels.SocketChannel;

public class Acceptor implements Runnable {
    private Reactor reactor;

    public Acceptor(Reactor reactor) {
        this.reactor = reactor;
    }

    @Override
    public void run() {
        try {
            SocketChannel socketChannel = reactor.serverSocketChannel.accept();
            if (socketChannel != null)// 调用Handler来处理channel
                new SocketReadHandler(reactor.selector, socketChannel);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

  

 

 

 

public class SocketReadHandler implements Runnable {
    private SocketChannel socketChannel;

    public SocketReadHandler(Selector selector, SocketChannel socketChannel)
            throws IOException {
        this.socketChannel = socketChannel;
        socketChannel.configureBlocking(false);

        SelectionKey selectionKey = socketChannel.register(selector, 0);

        // 将SelectionKey绑定为本Handler 下一步有事件触发时,将调用本类的run方法。
        // 参看dispatch(SelectionKey key)
        selectionKey.attach(this);

        // 同时将SelectionKey标记为可读,以便读取。
        selectionKey.interestOps(SelectionKey.OP_READ);
        selector.wakeup();
    }

    /**
     * 处理读取数据
     */
    @Override
    public void run() {
        ByteBuffer inputBuffer = ByteBuffer.allocate(1024);
        inputBuffer.clear();
        try {
            socketChannel.read(inputBuffer);
            // 激活线程池 处理这些request
            // requestHandle(new Request(socket,btt));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

 

 

angularjs转react 相关内容

没有更多推荐了,返回首页