项目部分使用react.js
2018-04-02 14:38:06 u013828625 阅读数 243

An Object Behavioral Pattern for Demultiplexing and Dispatching Handles for Synchronous Events

一个对于分离和分发同步事件句柄的对象行为模式

1 Intent/意图

The Reactor design pattern handles service requests that are delivered concurrently to an application by one or moreclients. Each service in an application may consist ofserveral methods and is represented by a separate event handlerthat is responsible for dispatching service-specific requests.Dispatching of event handlers is performed by an initiationdispatcher, which manages the registered event handlers.Demultiplexing of service requests is performed by asynchronous event demultiplexer.

Reactor设计模式处理来个一个或多个客户端的向一个应用同时发起的服务请求。应用中的每个服务可能由少数几个方法组成,通过一个单独的事件处理器表现出来,这个事件处理器负责分发特定的服务请求。事件处理器的分发是通过一个初始化的分发器(initiation dispatcher)进行,负责管理注册其上的事件处理器。分离服务请求是由异步事件分离器进行的。

2 Also Known As

Dispatcher, Notifier

分发器,通知器

3 Example

To illustrate the Reactor pattern, consider the event-driven server for a distributed logging service shown in Figure 1.Client applications use the logging service to record information about their status in a distributed environment. This status information commonly includes error notifications, debugging traces, and performance reports. Logging records are sent to a central logging server, which can write there cords to various output devices, such as a console, a printer,a file, or a network management database.

为了阐述reactor模式,考虑使用分布式的日志服务(在下图1)。客户端应用程序使用这个日志服务来记录他们在分布式环境中的状态信息。这些状态信息通常包括错误通知,调试栈以及性能报告。这些日志记录会被发送到一个中央的日志服务器上,它可以将这些记录写到多个输出设备上,像控制台,打印机,文件,网络管理数据库。

The logging server shown in Figure 1 handles logging records and connection requests sent by clients. Logging records and connection requests can arrive concurrently on multiple handles. A handle identifies network communication resources managed within an OS.

图一中的日志服务器处理通过客户端发送日志记录以及连接请求。日志记录和连接请求会同时到达多个handle上,一个handle定义了在操作系统内部管理的网络通信资源。



The logging server communicates with clients using a connection-oriented protocol, such as TCP [1]. Clients that want to log data must first send a connection request to the server. The server waits for these connection requests using a handle factory that listens on an address known to clients.When a connection request arrives, the handle factory establishes a connection between the client and the server by creating a new handle that represents an endpoint of the connection.This handle is returned to the server, which then waits for client service requests to arrive on the handle. Once clients are connected, they can send logging records concurrently to the server. The server receives these records via the connected socket handles.

日志服务器和客户端使用一种面向连接的协议进行通信,例如Tcp。客户端想要发送日志数据,首先必须发送一个连接请求给服务器。服务器使用一个handle工厂来监听已经的客户端地址,然后等待连接请求的到来。当一个连接请求到达时,这个handle工厂就会通过一种创建新的handle来建立一个客户端与服务器之间的链接,这个handle表示了一个连接到端点。这个handle是被服务器返回的,然后会等待客户端服务请求到达这个handle。一旦客户端连接上了,他们就能够同时的发送日志记录给服务器,服务器通过已经连接的socket handle收到这些记录。

Perhaps the most intuitive way to develop a concurrent logging server is to use multiple threads that can process multiple clients concurrently, as shown in Figure 2. This approach synchronously accepts network connections and spawns a “thread-per-connection” to handle client logging records.

大概开发一个并发的日志服务器最直接的方式就是使用多线程的方式,它能够并发地处理多个客户端(图二),这种方法同步接收网络连接,然后产生(一个线程对应一个连接)来处理客户端的日志记录。

However, using multi-threading to implement the processing of logging records in the server fails to resolve the following forces:

  • Efficiency: Threading may lead to poor performance due to context switching, synchronization, and data movement[2];
  • Programming simplicity: Threading may require complex concurrency control schemes;
  • Portability: Threading is not available on all OS platforms.

As a result of these drawbacks, multi-threading is often not the most efficient nor the least complex solution to develop a concurrent logging server.

但是,在服务器上使用多线程来实现日志记录的处理不能解决下面的问题:

  • 性能:线程可能会导致极低的性能,因为上下文切换,同步以及数据迁移;
  • 编程简单:线程可能会要求复杂的并发控制方案;
  • 可移植性:线程可能并不适用所有的操作系统;

因为这些缺陷,多线程通常不是最有效率,也不是最小的复杂度解决方案,来开发一个并发的日志服务器。

4 Context

A server application in a distributed system that receives events from one or more clients concurrently.

在分布式系统中的一个服务应用程序会同时接收一个或者多个客户端的事件。

5 Problem

Server applications in a distributed system must handle multiple clients that send them service requests. Before invokinga specific service, however, the server application must demultiplex and dispatch each incoming request to its corresponding service provider. Developing an effective server mechanisms for demultiplexing and dispatching client requests requires the resolution of the following forces:

在分布式系统中的服务应用程序必须处理多个发送服务请求的客户端。但是在调用特定服务之前,服务应用程序必须分离和分发每个进来的请求到它的对应的服务提供者。开发一个有效率的服务器机制来分离和分发客户端请求要求下面的问题的解决方案

  • Availability: The server must be available to handle incoming requests even if it is waiting for other requests to arrive.In particular, a server must not block indefinitely handling any single source of events at the exclusion of other event sources since this may significantly delay the responseness to other clients.
  • Efficiency: A server must minimize latency, maximize throughput, and avoid utilizing the CPU(s) unnecessarily.
  • Programming simplicity: The design of a server should simplify the use of suitable concurrency strategies.
  • Adaptability: Integrating new or improved services,such as changing message formats or adding server-side caching, should incur minimal modifications and maintenance costs for existing code. For instance, implementing new application services should not require modifications to the generic event demultiplexing and dispatching mechanisms.
  • Portability: Porting a server to a new OS platform should not require significant effort.
  • 可用性:服务器必须是可用的,即使在它等待其他请求的到来的时候,也能够处理进来的请求。另外,服务器在处理任何单个的事件源时,一定不能够无限期的阻塞,把其他事件源排除在外。这样会显著的延迟发给其他客户端的额响应
  • 性能:一个服务器必须是尽可能最低的延迟,尽可能最大的吞吐量,避免不必要的CPU利用。
  • 编程简单:这个服务器的设计应该简化合适的并发策略的使用。
  • 可扩展性:集成一个新的或者改进的服务,例如改变消息格式,添加服务器端缓存,应该针对现有的代码引发最小的改动和维护成本。举个例子,实现一个新的应用服务应该不需要修改通用的时间分离和分发机制。
  • 可移植性:移植一个服务器到一个新的操作系统上应该不需要做出重大的改动。

6 Solution

Integrate the synchronous demultiplexing of events and the dispatching of their corresponding event handlers that process the events. In addition, decouple the application specific dispatching and implementation of services from the general-purpose event demultiplexing and dispatching mechanisms.

集成同步的事件分离器以及处理这些事件的事件处理器的调度。除此之外,多路事件分离和调度机制解耦了特定于应用程序的调度与服务实现。

For each service the application offers, introduce a separate Event Handler that processes certain types of events. All Event Handlers implement the same interface.Event Handlers register with an InitiationDispatcher, which uses a Synchronous EventDemultiplexer to wait for events to occur. When events occur, the Synchronous Event Demultiplexer notifies the Initiation Dispatcher, which synchronously calls back to the Event Handler associated with the event. The Event Handler then dispatches the event to the method that implements the requested service.

对于每个服务,应用程序提供,介绍了一个单独的事件处理器,用于处理某些事件类型。所有的事件处理器都实现了相同的接口。事件处理器(Event Handler)通过注册到一个初始化的分发器(InitiationDispatcher)当中,这个InitiationDispatcher使用了同步事件分离器(Synchronous Event Demultiplexer)来等待事件的发生。当事件发生时,这个同步事件分离器(Synchronous Event Demultiplexer)会通知初始化分发器(InitiationDispatcher),初始化分发器(InitiationDispatcher)会同步地回调与这个时间相关联的事件处理器(Event Handler)。这个事件处理器(Event Handler)然后会分发这个事件到实现了请求服务的方法当中。

7 Structure

The key participants in the Reactor pattern include the following:

在Reactor模式中的关键角色:

Handles

  • Identify resources that are managed by an OS.These resources commonly include network connections,open files, timers, synchronization objects, etc.Handles are used in the logging server to identify socket endpoints so that a Synchronous EventDemultiplexer can wait for events to occur on them. The two types of events the logging server is interested in are connection events and read events, which represent incoming client connections and logging data,respectively. The logging server maintains a separate connection for each client. Every connection is represented in the server by a socket handle.

句柄

  • 可以看作是操作系统管理的资源。这些资源通常包括网络连接,打开的文件,定时器,同步对象等等。句柄(handles)在这个日志服务器中用来识别socket断点,以便同步事件分离器(Synchronous EventDemultiplexer)能够在他们上面等待事件发生。这个日志服务器的两种事件类型是连接事件和读事件,直观地展示了进来的客户端连接和日志数据。日志服务器为每个客户端都保持了独立的链接,每个连接在服务器上通过一个socket handle来进行表示。

Synchronous Event Demultiplexer

  • Blocks awaiting events to occur on a set of Handles.It returns when it is possible to initiate an operation on a Handle without blocking. A common demultiplexerfor I/O events is select [1], which is an event demultiplexing system call provided by the UNIX and Win32 OS platforms. The select call indicates which Handles can have operations invoked on them synchronously without blocking the application process.

同步事件分离器

  • 在一个handle集合上阻塞地等待事件发生,当它有可能在一个handle 上开始了一个操作时,它就会返回,结束阻塞。一个常用的I/O事件分离器就是select,它是一个被UNIX和window平台提供的事件分离系统调用,这个select能够指明了在哪个handles上有操作能够被进行同步调用了,不会去阻塞应用进程。

Initiation Dispatcher

  • Defines an interface for registering, removing, and dispatching Event Handlers. Ultimately, the Synchronous Event Demultiplexer is responsible for waiting until new events occur. When it detects new events, it informs the InitiationDispatcher to call back application-specific event handlers. Common events include connection acceptance events, data input and output events, and timeout events.

初始化分发器(初始化的调度器)

  • 定义了一个注册、移除,以及调度事件处理器的接口,从根本上讲,同步事件分离器(Synchronous Event Demultiplexer)是负责等待一个新的事件发生。当它发现了一个新的事件之后,它会通知初始化的调度器(InitiationDispatcher)来回调特定于应用的事件处理器。普通事件包括连接接收事件,数据输入输出事件,超时事件。

Event Handler

  • Specifies an interface consisting of a hook method [3]that abstractly represents the dispatching operation for service-specific events. This method must be implemented by application-specific services.

事件处理器

  • 指一个由回调钩子方法组成的接口,能够抽象地表示面向服务的事件的调度操作。这个方法必须被面向应用的服务所实现。

Concrete Event Handler

  • Implements the hook method, as well as the methods to process these events in an application-specific manner. Applications register Concrete EventHandlers with the Initiation Dispatcher to process certain types of events. When these events arrive,the Initiation Dispatcher calls back the hook method of the appropriate Concrete EventHandler.

    There are two Concrete Event Handlers in the logging server: Logging Handler and Logging Acceptor. The Logging Handler is responsiblefor receiving and processing logging records. The Logging Acceptor creates and connects LoggingHandlers that process subsequent logging records from clients.

具体的事件处理器

  • 实现了回调钩子方法,也就是以一种面向应用的方法处理这些事件的方法。应用程序注册具体的事件处理器(Concrete EventHandlers)到初始化调度器(Initiation Dispatcher)上,用来处理某一类事件。当这些事件到达时,初始化调度器(Initiation Dispatcher)就会回调合适的具体的事件处理器(Concrete EventHandler)中的钩子方法。
  • 在日志服务器上有两种具体事件处理器(Concrete Event Handlers):日志处理器(Logging Handler)和日志接收器(Logging Acceptor)。日志处理器(Logging Handler)是用来接收和处理日志记录的。日志接收器(Logging Acceptor)是用来创建和连接日志处理器(Logging Handler),处理后面来自客户端的日志记录。

The structure of the participants of the Reactor pattern is illustrated in the following OMT class diagram:

Reactor模式的各个角色的架构在下面的OMT类图中进行阐述:


8 Dynamics

8.1 General Collaborations(通常的协作)

The following collaborations occur in the Reactor pattern:

  • When an application registers a Concrete Event Handler with the Initiation Dispatcher the application indicates the type of event(s) this Event Handler wants the Initiation Dispatcher to notify it about when the event(s) occur on the associated Handle.
  • 当一个应用程序将一个具体的事件处理器(Concrete Event Handle)注册到初始化调度器(Initiation Dispatcher)上时,这个应用会指定事件类型,当相关的句柄(handle)上发生这个类型的事件时,事件处理器( Event Handler)想要初始化调度器(Initiation Dispatcher)通知它。
  • The Initiation Dispatcher requests each Event Handler to pass back its internal Handle. This Handle identifies the Event Handler to the OS.
  • 初始化调度器( Initiation Dispatcher)要求每个事件处理器( Event Handler )回传它内部的句柄(handle)。这个句柄(handle)向操作系统标识了事件处理器(Event Handler)。
  • After all Event Handlers are registered, an application calls handle_events to start the Initiation Dispatcher's event loop. At this point, the Initiation Dispatcher combines the Handle from each registeredEvent Handler and uses the Synchronous Event Demultiplexer to wait for events to occur on these Handles. For instance, the TCP protocol layer uses the select synchronous event demultiplexing operation to wait for client logging record events to arrive on connected socket Handles.
  • 在所有的事件处理器(Event Handlers)都注册完毕之后,应用程序会调用handle_events 方法来开始这个初始化调度器( Initiation Dispatcher)的事件循环。在这个点上,初始化调度器(Initiation Dispatcher )组合了从每个已经注册的事件处理器(Event Handler)上回传的handle,并且使用同步事件分离器(Synchronous Event Demultiplexer)来等待在这些句柄(handles)上事件的发生。例如,TCP协议层使用select 同步事件分离器操作来等待客户端到达已连接的套接字(socket)句柄的日志记录事件。
  • The Synchronous Event Demultiplexer notifies the Initiation Dispatcher when a Handle corresponding to an event source becomes “ready,” e.g., that a TCP socket is “ready for reading.”
  • 当一个句柄(handle)变成准备状态,同步事件分离器(Synchronous Event Demultiplexer)通知初始化调度器( Initiation Dispatcher
  • The Initiation Dispatcher triggers Event Handler hook method in response to events on the readyHandles. When events occur, the Initiation Dispatcher uses the Handles activated by the event sources as “keys” to locate and dispatch the appropriate Event Handler's hook method.
  • 初始化调度器( Initiation Dispatcher)触发事件处理器的钩子方法来响应在句柄(handle)上准备好的事件。当事件发生时,初始化调度器( Initiation Dispatcher )使用被事件源激活的句柄(handle)作为定位和调度正确的事件处理的回调方法的key。
  • The Initiation Dispatcher calls back to the handle_event hook method of the Event Handler to perform application-specific functionality in response to an event. The type of event that occurred can be passed as a parameter to the method and used internally by this method to perform additional service specific demultiplexing and dispatching. An alternative dispatching approach is described in Section 9.4.
  • 初始化调度器(Initiation Dispatcher )回调事件处理器(Event Handler)当中的handle_event的钩子方法,来执行特定于程序的功能,以响应一个事件。发生的事件的类型能够作为参数传递到这个方法,然后在这个方法内部使用,以完成附加的业务功能

The following interaction diagram illustrates the collaboration between application code and participants in the Reactor pattern:

下面的交互类图阐述了应用代码和在Reactor模式中各个角色的协作关系:




项目部分使用react.js 相关内容

2018-06-13 16:13:29 shujudeliu 阅读数 7593

.net reactor的使用

.NET Reactor 是一款强大的 .NET 代码混淆加密保护工具,常用于防止对 .NET 程序的反编译破解等场景。

本文以.net reactor加密dll为示例:

1.新建一个类库项目,并创建一个类,本文以对称加密算法为示例

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;

namespace PP.Encrypt
{
    //密码生成:https://suijimimashengcheng.51240.com/
    public class SymmetricMethod
    {
        private SymmetricAlgorithm mobjCryptoService;
        private string Key;
        /// <summary>   
        /// 对称加密类的构造函数   
        /// </summary>   
        public SymmetricMethod()
        {
            mobjCryptoService = new RijndaelManaged();
            Key = "FefZ$@pAedzg#HjT!QcM7JQqwOcAkCm7x2pZjBUMSocM9v6#%AP9HZg7OZ^ogG!x";
        }
        /// <summary>   
        /// 获得密钥   
        /// </summary>   
        /// <returns>密钥</returns>   
        private byte[] GetLegalKey()
        {
            string sTemp = Key;
            mobjCryptoService.GenerateKey();
            byte[] bytTemp = mobjCryptoService.Key;
            int KeyLength = bytTemp.Length;
            if (sTemp.Length > KeyLength)
                sTemp = sTemp.Substring(0, KeyLength);
            else if (sTemp.Length < KeyLength)
                sTemp = sTemp.PadRight(KeyLength, ' ');
            return ASCIIEncoding.ASCII.GetBytes(sTemp);
        }
        /// <summary>   
        /// 获得初始向量IV   
        /// </summary>   
        /// <returns>初试向量IV</returns>   
        private byte[] GetLegalIV()
        {
            string sTemp = "XUYXqW8QF2fqyytf0ZwU6Vv1cbNI3qU!zVzohQ0ptAug#&uJ3b^rEKkrckH1LE3i";
            mobjCryptoService.GenerateIV();
            byte[] bytTemp = mobjCryptoService.IV;
            int IVLength = bytTemp.Length;
            if (sTemp.Length > IVLength)
                sTemp = sTemp.Substring(0, IVLength);
            else if (sTemp.Length < IVLength)
                sTemp = sTemp.PadRight(IVLength, ' ');
            return ASCIIEncoding.ASCII.GetBytes(sTemp);
        }
        /// <summary>   
        /// 加密方法   
        /// </summary>   
        /// <param name="Source">待加密的串</param>   
        /// <returns>经过加密的串</returns>   
        public string Encrypto(string Source)
        {
            byte[] bytIn = UTF8Encoding.UTF8.GetBytes(Source);
            MemoryStream ms = new MemoryStream();
            mobjCryptoService.Key = GetLegalKey();
            mobjCryptoService.IV = GetLegalIV();
            ICryptoTransform encrypto = mobjCryptoService.CreateEncryptor();
            CryptoStream cs = new CryptoStream(ms, encrypto, CryptoStreamMode.Write);
            cs.Write(bytIn, 0, bytIn.Length);
            cs.FlushFinalBlock();
            ms.Close();
            byte[] bytOut = ms.ToArray();
            return Convert.ToBase64String(bytOut);
        }
        /// <summary>   
        /// 解密方法   
        /// </summary>   
        /// <param name="Source">待解密的串</param>   
        /// <returns>经过解密的串</returns>   
        public string Decrypto(string Source)
        {
            byte[] bytIn = Convert.FromBase64String(Source);
            MemoryStream ms = new MemoryStream(bytIn, 0, bytIn.Length);
            mobjCryptoService.Key = GetLegalKey();
            mobjCryptoService.IV = GetLegalIV();
            ICryptoTransform encrypto = mobjCryptoService.CreateDecryptor();
            CryptoStream cs = new CryptoStream(ms, encrypto, CryptoStreamMode.Read);
            StreamReader sr = new StreamReader(cs);
            return sr.ReadToEnd();
        }
    }
}

2.安装.net reactor5.0

如果要单独加密某个dll,单击“open”选择要加密的dll,然后在下边"Quick Settings"里边选择加密的选项即可,使用菜单里的“Protection Presets”可以快速设定"Quick Settings"里的选项,本例使用类库强加密,如图:

然后切换到Protection选项卡,点击“Protect”按钮即可生成加密后的dll

执行加密保护操作以后,默认会在原dll文件夹下创建一个后缀名_Secure的文件夹,里边就是加密以后的dll了。

此时使用.net reflector反编译工具查看生成的dll,即可发现无法直接看到相关方法信息了,如图:

3.在VS项目中配置自动加密

这种手动加密的方法固然可以,不过实际使用中却不是很方便,毕竟需要手动加密的。.net reactor强大的地方在于可以在VS项目生成的时候自动加密。

打开.Net Reactor 5.0,单击Help菜单,选择开发使用的VS版本,这个操作会在VS中安装.net reactor插件

打开VS,点击“工具”菜单,可以看到.net reactor插件已安装

点击上图中的.Net Reactor

随后点击生成项目,会跳出加密的画面:

这个加密画面只有修改了.net reactor插件中的配置后第一次生成时候可以明显看到,随后再执行生成都是一闪而过,几乎感觉不到,可以说是非常便捷的。

.net reactor功能非常强大,还可以用于将站点、服务、类库文件打包成一个文件等等,本文只是介绍了简单的使用。

 

此外,混淆加壳和逆向脱壳破解,本身就是矛和盾之争,安全的概念只是相对的。比如一款叫de4dot的逆向工具,就可以对大部分.net reactor加密的程序进行脱壳破解(当然需要懂一些逆向的知识),大家使用时候自己有所衡量就可以了。

 

项目部分使用react.js 相关内容

2018-04-26 01:50:00 weixin_34050389 阅读数 6

Reactor Core

Non-Blocking Reactive Streams Foundation for the JVM both implementing a Reactive Extensions inspired API and efficient event streaming support.

Getting it

Reactor 3 requires Java 8 or + to run.

With Gradle from repo.spring.io or Maven Central repositories (stable releases only):

    repositories {
//      maven { url 'http://repo.spring.io/snapshot' }
      maven { url 'http://repo.spring.io/milestone' }
      mavenCentral()
    }

    dependencies {
      //compile "io.projectreactor:reactor-core:3.1.4.RELEASE"
      //testCompile("io.projectreactor:reactor-test:3.1.4.RELEASE")
      compile "io.projectreactor:reactor-core:3.2.0.M1"
      testCompile("io.projectreactor:reactor-test:3.2.0.M1")
    }

See the reference documentation
for more information on getting it (eg. using Maven, or on how to get milestones and snapshots).

Note about Android support: Reactor 3 doesn't officially support nor target Android.
However it should work fine with Android SDK 26 (Android O) and above. See the
complete note
in the reference guide.

Getting Started

New to Reactive Programming or bored of reading already ? Try the Introduction to Reactor Core hands-on !

If you are familiar with RxJava or if you want to check more detailled introduction, be sure to check
https://www.infoq.com/articles/reactor-by-example !

Flux

A Reactive Streams Publisher with basic flow operators.

  • Static factories on Flux allow for source generation from arbitrary callbacks types.
  • Instance methods allows operational building, materialized on each Flux#subscribe(), Flux#subscribe() or multicasting operations such as Flux#publish and Flux#publishNext.

<img src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/flux.png" width="500">

Flux in action :

Flux.fromIterable(getSomeLongList())
    .mergeWith(Flux.interval(100))
    .doOnNext(serviceA::someObserver)
    .map(d -> d * 2)
    .take(3)
    .onErrorResumeWith(errorHandler::fallback)
    .doAfterTerminate(serviceM::incrementTerminate)
    .subscribe(System.out::println);

Mono

A Reactive Streams Publisher constrained to ZERO or ONE element with appropriate operators.

  • Static factories on Mono allow for deterministic zero or one sequence generation from arbitrary callbacks types.
  • Instance methods allows operational building, materialized on each Mono#subscribe() or Mono#get() eventually called.

<img src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/mono.png" width="500">

Mono in action :

Mono.fromCallable(System::currentTimeMillis)
    .flatMap(time -> Mono.first(serviceA.findRecent(time), serviceB.findRecent(time)))
    .timeout(Duration.ofSeconds(3), errorHandler::fallback)
    .doOnSuccess(r -> serviceM.incrementSuccess())
    .subscribe(System.out::println);

Blocking Mono result :

Tuple2<Long, Long> nowAndLater = 
        Mono.zip(
                Mono.just(System.currentTimeMillis()),
                Flux.just(1).delay(1).map(i -> System.currentTimeMillis()))
            .block();

Schedulers

Reactor uses a Scheduler as a
contract for arbitrary task execution. It provides some guarantees required by Reactive
Streams flows like FIFO execution.

You can use or create efficient schedulers
to jump thread on the producing flows (subscribeOn) or receiving flows (publishOn):


Mono.fromCallable( () -> System.currentTimeMillis() )
    .repeat()
    .publishOn(Schedulers.single())
    .log("foo.bar")
    .flatMap(time ->
        Mono.fromCallable(() -> { Thread.sleep(1000); return time; })
            .subscribeOn(Schedulers.parallel())
    , 8) //maxConcurrency 8
    .subscribe();

ParallelFlux

ParallelFlux can starve your CPU's from any sequence whose work can be subdivided in concurrent
tasks. Turn back into a Flux with ParallelFlux#sequential(), an unordered join or
use abitrary merge strategies via 'groups()'.

Mono.fromCallable( () -> System.currentTimeMillis() )
    .repeat()
    .parallel(8) //parallelism
    .runOn(Schedulers.parallel())
    .doOnNext( d -> System.out.println("I'm on thread "+Thread.currentThread()) )
    .subscribe()

Custom sources : Flux.create and FluxSink, Mono.create and MonoSink

To bridge a Subscriber or Processor into an outside context that is taking care of
producing non concurrently, use Flux#create, Mono#create.

Flux.create(sink -> {
         ActionListener al = e -> {
            sink.next(textField.getText());
         };

         // without cancellation support:
         button.addActionListener(al);

         // with cancellation support:
         sink.onCancel(() -> {
            button.removeListener(al);
         });
    },
    // Overflow (backpressure) handling, default is BUFFER
    FluxSink.OverflowStrategy.LATEST)
    .timeout(3)
    .doOnComplete(() -> System.out.println("completed!"))
    .subscribe(System.out::println)

The Backpressure Thing

Most of this cool stuff uses bounded ring buffer implementation under the hood to mitigate signal processing difference between producers and consumers. Now, the operators and processors or any standard reactive stream component working on the sequence will be instructed to flow in when these buffers have free room AND only then. This means that we make sure we both have a deterministic capacity model (bounded buffer) and we never block (request more data on write capacity). Yup, it's not rocket science after all, the boring part is already being worked by us in collaboration with Reactive Streams Commons on going research effort.

What's more in it ?

"Operator Fusion" (flow optimizers), health state observers, helpers to build custom reactive components, bounded queue generator, hash-wheel timer, converters from/to Java 9 Flow, Publisher and Java 8 CompletableFuture. The repository contains a reactor-test project with test features like the StepVerifier.


Reference Guide

http://projectreactor.io/docs/core/release/reference/docs/index.html

Javadoc

https://projectreactor.io/docs/core/release/api/

Getting started with Flux and Mono

https://github.com/reactor/lite-rx-api-hands-on

Reactor By Example

https://www.infoq.com/articles/reactor-by-example

Head-First Spring & Reactor

https://github.com/reactor/head-first-reactive-with-spring-and-reactor/

Beyond Reactor Core

  • Everything to jump outside the JVM with the non-blocking drivers from Reactor Netty.
  • Reactor Addons provide for adapters and extra operators for Reactor 3.

Powered by Reactive Streams Commons

Licensed under Apache Software License 2.0

Sponsored by Pivotal

项目部分使用react.js 相关内容

2018-04-26 01:50:00 universsky2015 阅读数 57

Reactor Core

Non-Blocking Reactive Streams Foundation for the JVM both implementing a Reactive Extensions inspired API and efficient event streaming support.

Getting it

Reactor 3 requires Java 8 or + to run.

With Gradle from repo.spring.io or Maven Central repositories (stable releases only):

    repositories {
//      maven { url 'http://repo.spring.io/snapshot' }
      maven { url 'http://repo.spring.io/milestone' }
      mavenCentral()
    }

    dependencies {
      //compile "io.projectreactor:reactor-core:3.1.4.RELEASE"
      //testCompile("io.projectreactor:reactor-test:3.1.4.RELEASE")
      compile "io.projectreactor:reactor-core:3.2.0.M1"
      testCompile("io.projectreactor:reactor-test:3.2.0.M1")
    }

See the reference documentation
for more information on getting it (eg. using Maven, or on how to get milestones and snapshots).

Note about Android support: Reactor 3 doesn't officially support nor target Android.
However it should work fine with Android SDK 26 (Android O) and above. See the
complete note
in the reference guide.

Getting Started

New to Reactive Programming or bored of reading already ? Try the Introduction to Reactor Core hands-on !

If you are familiar with RxJava or if you want to check more detailled introduction, be sure to check
https://www.infoq.com/articles/reactor-by-example !

Flux

A Reactive Streams Publisher with basic flow operators.

  • Static factories on Flux allow for source generation from arbitrary callbacks types.
  • Instance methods allows operational building, materialized on each Flux#subscribe(), Flux#subscribe() or multicasting operations such as Flux#publish and Flux#publishNext.

<img src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/flux.png" width="500">

Flux in action :

Flux.fromIterable(getSomeLongList())
    .mergeWith(Flux.interval(100))
    .doOnNext(serviceA::someObserver)
    .map(d -> d * 2)
    .take(3)
    .onErrorResumeWith(errorHandler::fallback)
    .doAfterTerminate(serviceM::incrementTerminate)
    .subscribe(System.out::println);

Mono

A Reactive Streams Publisher constrained to ZERO or ONE element with appropriate operators.

  • Static factories on Mono allow for deterministic zero or one sequence generation from arbitrary callbacks types.
  • Instance methods allows operational building, materialized on each Mono#subscribe() or Mono#get() eventually called.

<img src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/mono.png" width="500">

Mono in action :

Mono.fromCallable(System::currentTimeMillis)
    .flatMap(time -> Mono.first(serviceA.findRecent(time), serviceB.findRecent(time)))
    .timeout(Duration.ofSeconds(3), errorHandler::fallback)
    .doOnSuccess(r -> serviceM.incrementSuccess())
    .subscribe(System.out::println);

Blocking Mono result :

Tuple2<Long, Long> nowAndLater = 
        Mono.zip(
                Mono.just(System.currentTimeMillis()),
                Flux.just(1).delay(1).map(i -> System.currentTimeMillis()))
            .block();

Schedulers

Reactor uses a Scheduler as a
contract for arbitrary task execution. It provides some guarantees required by Reactive
Streams flows like FIFO execution.

You can use or create efficient schedulers
to jump thread on the producing flows (subscribeOn) or receiving flows (publishOn):


Mono.fromCallable( () -> System.currentTimeMillis() )
    .repeat()
    .publishOn(Schedulers.single())
    .log("foo.bar")
    .flatMap(time ->
        Mono.fromCallable(() -> { Thread.sleep(1000); return time; })
            .subscribeOn(Schedulers.parallel())
    , 8) //maxConcurrency 8
    .subscribe();

ParallelFlux

ParallelFlux can starve your CPU's from any sequence whose work can be subdivided in concurrent
tasks. Turn back into a Flux with ParallelFlux#sequential(), an unordered join or
use abitrary merge strategies via 'groups()'.

Mono.fromCallable( () -> System.currentTimeMillis() )
    .repeat()
    .parallel(8) //parallelism
    .runOn(Schedulers.parallel())
    .doOnNext( d -> System.out.println("I'm on thread "+Thread.currentThread()) )
    .subscribe()

Custom sources : Flux.create and FluxSink, Mono.create and MonoSink

To bridge a Subscriber or Processor into an outside context that is taking care of
producing non concurrently, use Flux#create, Mono#create.

Flux.create(sink -> {
         ActionListener al = e -> {
            sink.next(textField.getText());
         };

         // without cancellation support:
         button.addActionListener(al);

         // with cancellation support:
         sink.onCancel(() -> {
            button.removeListener(al);
         });
    },
    // Overflow (backpressure) handling, default is BUFFER
    FluxSink.OverflowStrategy.LATEST)
    .timeout(3)
    .doOnComplete(() -> System.out.println("completed!"))
    .subscribe(System.out::println)

The Backpressure Thing

Most of this cool stuff uses bounded ring buffer implementation under the hood to mitigate signal processing difference between producers and consumers. Now, the operators and processors or any standard reactive stream component working on the sequence will be instructed to flow in when these buffers have free room AND only then. This means that we make sure we both have a deterministic capacity model (bounded buffer) and we never block (request more data on write capacity). Yup, it's not rocket science after all, the boring part is already being worked by us in collaboration with Reactive Streams Commons on going research effort.

What's more in it ?

"Operator Fusion" (flow optimizers), health state observers, helpers to build custom reactive components, bounded queue generator, hash-wheel timer, converters from/to Java 9 Flow, Publisher and Java 8 CompletableFuture. The repository contains a reactor-test project with test features like the StepVerifier.


Reference Guide

http://projectreactor.io/docs/core/release/reference/docs/index.html

Javadoc

https://projectreactor.io/docs/core/release/api/

Getting started with Flux and Mono

https://github.com/reactor/lite-rx-api-hands-on

Reactor By Example

https://www.infoq.com/articles/reactor-by-example

Head-First Spring & Reactor

https://github.com/reactor/head-first-reactive-with-spring-and-reactor/

Beyond Reactor Core

  • Everything to jump outside the JVM with the non-blocking drivers from Reactor Netty.
  • Reactor Addons provide for adapters and extra operators for Reactor 3.

Powered by Reactive Streams Commons

Licensed under Apache Software License 2.0

Sponsored by Pivotal

项目部分使用react.js 相关内容

2013-10-21 18:19:01 wang1144 阅读数 896
(2011-03-13 17:55:36)
标签:

twisted

reactor

杂谈

分类:python学习

作者:dave@http://krondo.com/?p=1247译者:杨晓伟(采用意译)

第二部分:低效的诗歌服务器来启发对Twisted机制的理解

这个系列是从这里开始的,欢迎你再次来到这里来。现在我们可能要写一些代码。在开始之前,我们都做出一些必要的假设。

关于对你的假设

在展开讨论前,我假设你已经有过用Python写同步程序的经历并且至少知道一点有关PythonSockt编程的经验。如果你从没有写过Socket程序,或许你可以去看看Socket模块的文档,尤其是后面的示例代码。如果你没有用过Python的话,那后面的描述对你来说可能比看周易还痛苦。


你所使用的计算机的情况(想的真周到,:)

我一般是在Linux上使用Twisted,这个系列的示例代码也是在Linux下完成的。首先声明的是我并没有故意让代码失去平台无关性,但我所讲述的一些内容确实可能仅仅适应于Linux和其它的类Unix(比如MACOSXFreeBSD)。WIndows是个奇怪诡异的地方(??为什么这么评价Windows呢),如果你想尝试在它上面学习这个系列,抱歉,如果出了问题,我无法提供任何帮助。

并且假设你已经安装了近期版本的PythonTwisted。我所提供的示例示例代码是基于Python2.5Twisted8.2.0

你可以在单机上运行所有的示例代码,也可以在网络系统上运行它们。但是为了学习异步编程的机制,单机上学习是比较理想的。


获取代码的方法

使用git工具来获取Dave的最新示例代码。在shell或其它命令行上输入以下命令(假设已经安装git):

git clonegit://github.com/jdavisp3/twisted-intro.git

下载结束后,解压并进入第一层文件夹(你可以看到有个README文件)。


低效的诗歌服务器

虽然CPU的处理速度远远快于网络,但网络的处理速度仍然比人脑快,至少比人类的眼睛快。因此,想通过网络来获得CPU的视角是很困难的,尤其是在单机的回环模式中数据流全速传输时,更是困难重重。

我们所需要的是一个慢速低效诗歌服务器,其用人为的可变延时来体现影响结果。毕竟服务器要提供点东西吗,我们就提供诗歌好了。目录下面有个子目录专门存放诗歌用的。

最简单的慢速诗歌服务器在blocking-server/slowpoetry.py中实现。你可用下面的方式来运行它。

python blocking-server/slowpoetry.py poetry/ecstasy.txt

上面这个命令将启动一个阻塞的服务器,其提供“Ecstasy”这首诗。现在我们来看看它的源码内容,正如你所见,这里面并没有使用任何Twisted的内容,只是最基本的Socket编程操作。它每次只发送一定字节数量的内容,而每次中间延时一段时间。默认的是每隔0.1秒发送10个比特,你可以通过-delay

-num-bytes参数来设置。例如每隔5秒发送50比特:

          pythonblocking-server/slowpoetry.py --num-bytes 50 –delay5poetry/ecstasy.txt

当服务器启动时,它会显示其所监听的端口号。默认情况下,端口号是在可用端口号池中随机选择的。你可能想使用固定的端口号,那么无需更改代码,只需要在启动命令中作下修改就OK了,如下所示:

python blocking-server/slowpoetry.py --port 10000 poetry/ecstasy.txt

如果你装有netcat工具,可以用如下命令来测试你的服务器(也可以用telnet):

netcat localhost 10000

如果你的服务器正常工作,那么你就可以看到诗歌在你的屏幕上慢慢的打印出来。对!你会注意到每次服务器都会发送过一行的内容过来。一旦诗歌传送完毕,服务器就会关闭这条连接。

默认情况下,服务器只会监听本地回环的端口。如果你想连接另外一台机子的服务器,你可以指定其IP地址内容,命令行参数是 -iface选项。

不仅是服务器在发送诗歌的速度慢,而且读代码可以发现,服务器在服务一个客户端时其它连接进来的客户端只能处于等待状态而得不到服务。这的确是一个低效慢速的服务器,要不是为了学习,估计没有任何其它用处。


阻塞模式的客户端

在示例代码中有一个可以从多个服务器中顺序(一个接一个)地下载诗歌的阻塞模式的客户端。下面让这个客户端执行三个任务,正如第一个部分图1描述的那样。首先我们启动三个服务器,提供三首不同的诗歌。在命令行中运行下面三条命令:

python blocking-server/slowpoetry.py --port 10000 poetry/ecstasy.txt --num-bytes 30
python blocking-server/slowpoetry.py --port 10001 poetry/fascination.txt
python blocking-server/slowpoetry.py --port 10002 poetry/science.txt

如果在你的系统中上面那些端口号有正在使用中,可以选择其它没有被使用的端口。注意,由于第一个服务器发送的诗歌是其它的三倍,这里我让第一个服务器使用每次发送30个字节而不是默认的10个字节,这样一来就以3倍于其它服务器的速度发送诗歌,因此它们会在几乎相同的时间内完成工作。

现在我们使用阻塞模式的客户端来获取诗歌,运行如下所示的命令:

python blocking-client/get-poetry.py 10000 10001 10002

如果你修改了上面服务口器的端口,你需要在这里时行相应的修改以保持一致。由于这个客户端采用的是阻塞模式,因此它会一首一首的下载,即只有在完成一首时才会开始下载另外一首。这个客户端会像下面这样打印出提示信息而不是将诗歌打印出来:

Task 1: get poetry from: 127.0.0.1:10000
Task 1: got 3003 bytes of poetry from 127.0.0.1:10000 in 0:00:10.126361 
Task 2: get poetry from: 127.0.0.1:10001 
Task 2: got 623 bytes of poetry from 127.0.0.1:10001 in 0:00:06.321777
Task 3: get poetry from: 127.0.0.1:10002 
Task 3: got 653 bytes of poetry from 127.0.0.1:10002 in 0:00:06.617523
Got 3 poems in 0:00:23.065661

这图1最典型的文字版了,每个任务下载一首诗歌。你运行后可能显示的时间会与上面有所差别,并且也会随着你改变服务器的发送时间参数而改变。尝试着更改一下参数来观测一下效果。


异步模式的客户端

现在,我们来看看不用Twisted构建的异步模式的客户端。首先,我们先运行它试试。启动使用前面的三个端口来启动三个服务器。如果前面开启的还没有关闭,那就继续用它们好了。接下来,我们通过下面这段命令来启动我们的异步模式的客户端:

python async-client/get-poetry.py 10000 10001 10002 
 
你或许会得到类似于下面的输出:

Task 1: got 30 bytes of poetry from127.0.0.1:10000 

Task 2: got 10 bytes of poetry from127.0.0.1:10001

Task 3: got 10 bytes of poetry from127.0.0.1:10002

Task 1: got 30 bytes of poetry from127.0.0.1:10000 

Task 2: got 10 bytes of poetry from127.0.0.1:10001

...

Task 1: 3003 bytes ofpoetry

Task 2: 623 bytes ofpoetry

Task 3: 653 bytes ofpoetry

Got 3 poems in0:00:10.133169

这次的输出可能会比较长,这是由于在异步模式的客户端中,每次接收到一段服务器发送来的数据都要打印一次提示信息,而服务器是将诗歌分成若干片段发送出去的。值得注意的是,这些任务相互交错执行,正如第一部分图3所示。

尝试着修改服务器的设置(如将一个服务器的延时设置的长一点),来观察一下异步模式的客户端是如何针对变慢的服务器自动调节自身的下载来与较快的服务器保持一致。这正是异步模式在起作用。

还需要值得注意的是,根据上面的设置,异步模式的客户端仅在10秒内完成工作,而同步模式的客户端却使用了23秒。现在回忆一下第一部分中图3与图4.通过减少阻塞时间,我们的异步模式的客户端可以在更短的时间里完成下载。诚然,我们的异步客户端也有些阻塞发生,那是由于服务器太慢了。由于异步模式的客户端可以在不同的服务器来回切换,它比同步模式的客户产生的阻塞就少得多。


更近一步的观察

现在让我们来读一下异步模式客户端的代码。注意其与同步模式客户端的差别:

 

1.异步模式客户端一次性与全部服务器完成连接,而不像同步模式那样一次只连接一个。

2.用来进行通信的Socket方法是非阻塞模的,这是通过调用setblocking(0来实现的。

3.select模块中的select方法是用来识别是其监视的socket是否有完成数据接收的,如果没有

它就处于阻塞状态。

4.当从服务器中读取数据时,会尽量多地从Sockt读取数据直到它阻塞为止,然后读下一个Sockt

接收的数据(如果有数据接收的话)。这意味着我们需要跟踪记录从不同服务器传送过来诗歌的接

收情况(因为,一首诗的接收并不是连续完成,所以需要保证每个任务的可连续性,就得有冗余

的信息来完成这一工作)。

异步模式中客户端的核心就是最高层的循环体,即get_poetry函数。这个函数可以被拆分成两个步骤:

1.使用select函数等待所有Socket,直到至少有一个socket有数据到来。

2.对每个有数据需要读取的socket,从中读取数据。但仅仅只是读取有效数据,不能为了等待还没

来到的数据而发生阻塞。

3.重复前两步,直到所有的socket被关闭。


可以看出,同步模式客户端也有个循环体(在main函数内),但是这个循环体的每个迭代都是完成一首诗的下载工作。而在异步模式客户端的每次迭代过程中,我们可以完成所有诗歌的下载或者是它们中的一些。我们并不知道在一个迭代过程中,在下载那首诗,或者一次迭代中我们下载了多少数据。这些都依赖于服务器的发送速度与网络环境。我们只需要select函数告诉我们那个socket有数据需要接收,然后在保证不阻塞程序的前提下从其读取尽量多的数据。

如果在服务器端口固定的条件下,同步模式的客户端并不需要循环体,只需要顺序罗列三个get_poetry

就可以了。但是我们的异步模式的客户端必须要有一个循环体来保证我们能够同时监视所有的socket端。这样我们就能在一次循环体中处理尽可能多的数据。

这个利用循环体来等待事件发生,然后处理发生的事件的模型非常常见,而被设计成为一个模式:reactor模式。其图形化表示如图5所示:

第二部分:异步编程初探与reactor模式


这个循环就是个”reactor“(反应堆),因为它等待事件的发生然对其作为相应的反应。正因为如此,它也被称作事件循环。由于交互式系统都要进行I/O操作,因此这种循环也有时被称作selectloop,这是由于select调用被用来等待I/O操作。因此,在本程序中的select循环中,一个事件的发生意味着一个socket端处有数据来到。值得注意的是,select并不是唯一的等待I/O操作的函数,它仅仅是一个比较古老的函数而已(因此才被用的如此广泛)。现在有一些新API可以完成select的工作而且性能更优,它们已经在不同的系统上实现了。不考虑性能上的因素,它们都完成同样的工作:监视一系列sockets(文件描述符)并阻塞程序,直到至少有一个准备好时行I/O操作。


严格意义上来说,我们的异步模式客户端中的循环并不是reactor模式,因为这个循环体并没有独立于业务处理(在此是接收具体个服务器传送来的诗歌)之外。它们被混合在一起。一个真正reactor模式的实现是需要实现循环独立抽象出来并具有如下的功能:

 

1.监视一系列与你I/O操作相关的文件描述符(description)

2.不停地向你汇报那些准备好I/O操作的文件描述符


一个设计优秀的reactor模式实现需要做到:

1.处理所有不同系统会出现的I/O事件

2.提供优雅的抽象来帮助你在使用reactor时少花些心思去考虑它的存在

3.提供你可以在抽象层外(treactor实现)使用的公共协议实现。


好了,我们上面所说的其实就是Twisted—健壮、跨平台实现了reactor模式并含有很多附加功能。

在第三部分中,实现Twisted版的下载诗歌服务时,我们将开始写一些简单的Twisted程序。

项目部分使用react.js 相关内容

没有更多推荐了,返回首页