atom react 配置
2017-06-29 21:30:43 hxpjava1 阅读数 426

Reactor简介

Reactor是一个基础库,用在构建实时数据流应用、要求有容错和低延迟至毫秒、纳秒、皮秒的服务。

— PrefaceTL;DR

什么是Reactor?

  让我们大致了解一下Reactor。在你使用喜欢的搜索敲入一些关键词如Reactive、spring Reactive、Asynchronous Java或者仅仅是”What the heck is Reactor?”.简而言之,Reactor是一个轻量级的JVM基础库,它可以帮助我们构建的服务和应用高效而异步的传递消息。

高效的含义是什么呢? 
传递一个消息从A到B时GC产生的内存很小或者完全没有。 
当消费者处理消息的速度低于生产者产生消息的速度时产生了溢出时,必须尽快处理。 
尽可能的提供无锁的异步流。 
  据以往的经验来看,我们知道异步编程是困难的,特别是当一个平台提供了很多选项如JVM。

Reactor瞄准绝大部分场景中真正的无阻塞,并且提供了一组比原生Jdk的java.util.concurrent库更高效的API。Reactor也提供了一个可选性(不建议使用):

  阻塞等待:如Future.get()。

 Unsafe数据获取:如ReentrantLock.lock()。

  异常抛出:如try ..catch …finally

  同步阻塞:如 syschronized

  Wrapper配置(GC压力):例如 new Wrapper(event)

让我们先使用一个纯正的Executor方法:

复制代码

private ExecutorService  threadPool = Executors.newFixedThreadPool(8);

final List<T> batches = new ArrayList<T>();

Callable<T> t = new Callable<T>() {  //1

    public T run() {
        synchronized(batches) {  //2
            T result = callDatabase(msg); //3
            batches.add(result);
            return result;
        }
    }
};

Future<T> f = threadPool.submit(t); //4
T result = f.get()  //5
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

复制代码 
  1.分配回调方法—可能会导致gc压力。

  2.Synchronization将强制对每个线程停止检查。

  3. 存在消费者的消费能力低于生产者生产能力的隐患。

  4. 使用线程池将task传递到目标线程–肯定通过FutureTask给gc造成压力。

  5. 阻塞直至callDatabase()响应。

从上述的简单示例中,容易看出扩展性会受到严重的影响。

  不断分配的对象将导致gc停止工作,特别是耗时比较多的大任务时。当一个gc停止工作时将会从降低全局的性能。

  队列默认情况下长度是不受限制的。任务会堆积到数据库中。

   后台日志不是一个内存泄露的地方,但是副作用就比较烦人了:在gc暂停工作时需要扫描更多对象;损失数据重要bit的风险;等等。

    经典链接Queue分配节点时产生的内存压力。

  使用阻塞方式应答请求时发生恶性循环。

    阻塞方式应答导致生产者效率慢下来。实际上,因为需要提交更多任务时等待响应,流程变成了基本的同步方式。

    同数据存储的通信异常将以不友好的形式传递到生产者,通过线程边界来分离工作,这使容错的协商变的比较容易。

  完全的、真正的非阻塞比较难以实现—特别是有比较时髦名称的分布式系统中如微服务架构。然而,Reactor却没有妥协,它试图利用可用的最佳模式来使开发者不必觉得像是在写一个数学论文而仅仅是一个微服务(nanservice)。

spring reactor 多线程配置 
首先搭建spring mybatis项目,前面博客已经有搭建步骤 
框架spring reactor,可以帮助我们新开一个异步的线程来处理一些比如记录日志的功能,这样就能节约后台相应的时间。 
1:引入jar包,这里使用的是maven,只需要引用一个jar包就行了

<!-- https://mvnrepository.com/artifact/org.projectreactor/reactor-spring -->
        <dependency>
            <groupId>org.projectreactor</groupId>
            <artifactId>reactor-spring</artifactId>
            <version>1.0.1.RELEASE</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

2:写一个reactor的配置的bean

package com.baobaotao.reactor;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import reactor.core.Environment;
import reactor.core.Reactor;
import reactor.core.spec.Reactors;
import reactor.spring.context.config.EnableReactor;

@Configuration
@EnableReactor
public class ReactorConfig {

    @Bean(name="rootReactor")
    public Reactor rootReactor(Environment env){
        return Reactors.reactor().env(env).get();
    }

    @Bean(name = "reportReactor")
    public Reactor reportReactor(Environment env) {
        return Reactors.reactor().env(env).get();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

3:事件的处理类,一般是以Hander结尾,方便区分:

package com.baobaotao.reactor;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;


import reactor.core.Reactor;
import reactor.event.Event;
import reactor.spring.annotation.Selector;

@Component
public class IndexHandler {

    @Autowired
    @Qualifier("rootReactor")
    private Reactor reactor ;
    @Autowired
    @Qualifier("reportReactor")
    private Reactor reactorxx ;


    @Selector(value="hello",reactor="@rootReactor")
    public void handleTestTopic(Event<String> evt)throws Exception{
        System.out.println("************");
    }

    @Selector(value="hellos",reactor="@reportReactor")
    public void handleTestTopics(Event<String> evt)throws Exception{
        System.out.println("xxxxxx**********");
        String data = evt.getData();
        System.out.println(data);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

4:最后就是在controller或者service里面通知新开线程了:

package com.baobaotao.reactor;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Controller;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import reactor.core.Reactor;
import reactor.event.Event;

@Controller
@RequestMapping("/baobaotao/recator/")
public class IndexController {

    @Autowired
    @Qualifier("rootReactor")
    private Reactor r;

    @Autowired
    @Qualifier("reportReactor")
    private Reactor rx;
    @RequestMapping("/chen")
    @Transactional
    @ResponseBody
    public void chen() {
        r.notify("hello", Event.wrap("你好"));
    }

    @RequestMapping("/chenzy")
    @Transactional
    @ResponseBody
    public void chenzy() {
        rx.notify("hellos", Event.wrap("好"));
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

启动程序请求http://127.0.0.1/baobaotao/recator/chenzy 
就可以看到log输出

System.out.println("xxxxxx**********");
        String data = evt.getData();
        System.out.println(data);
  • 1
  • 2
  • 3
  • 1
  • 2
  • 3

atom react 配置 相关内容

2015-09-13 00:05:00 weixin_34192816 阅读数 0

 1:引入jar包,这里使用的是maven,只需要引用一个jar包就行了

  <DEPENDENCY>

  <GROUPID>org.projectreactor</GROUPID>

  reactor-spring</ARTIFACTID>

  <VERSION>1.0.0.RELEASE</VERSION>

  </DEPENDENCY>

  2:写一个reactor的配置的bean

  @Configuration

  @EnableReactor

  public class ReactorConfig {

  @Bean(name = "rootReactor")

  public Reactor rootReactor(Environment env) {

  return Reactors.reactor()。env(env)。get();

  }

  @Bean(name = "reportReactor")

  public Reactor reportReactor(Environment env) {

  return Reactors.reactor()。env(env)。get();

  }

  }

  3:事件的处理类,一般是以Hander结尾,方便区分:

  @Component

  public class IndexHandler {

  @Autowired

  @Qualifier("rootReactor")

  private Reactor reactor;

  @Selector(value = "hello", reactor = "@rootReactor")

  public void handleTestTopic(Event<STRING> evt) throws Exception {

  System.out.println("************");

  }

  }

  4:最后就是在controller或者service里面通知新开线程了:

  @Controller

  public class IndexController {

  @Autowired

  @Qualifier("rootReactor")

  private Reactor r;

  @RequestMapping("chen")

  @Transactional

  public void chen() {

  r.notify("hello", Event.wrap("你好"));

  }

  }











---------------------------------------------------------------------------------------



Reactor - A Foundation for Reactive FastData Appli该文简单介绍了Spring reactor 1.0的基本特性。

目前reactor是作为Spring.io核心包下面项目。

Reactor 是一个基础性库包
–定位在用户级和低级之间的灰色区域的抽象。
– 能够在Reactor上建立组件和应用核心
– 驱动器 服务器和数据整合库,领域整合库,事件驱动架构

Reactor的应用是reactive的。
– 属于Reactive Extensions in .NET
– 类似Netflix RxJava
– 观察者模式Observer pattern

Reactor应用基于一个Selector发送事件。
– 象一个消息系统中routing topic, 但是它是一个对象
– 支持Regex, URI template, Class.isAssingableFrom, 定制逻辑

Reactor Core内部封装了LMAX Disruptor的RingBuffer,再通过Reactor-Spring等支持支持各种Spring应用,如下图:


13001357_BNbk.jpg



Reactor演示代码

Environment env = new Environment();
Reactor reactor = Reactors.reactor()
                               .env(env)
                               .dispatcher(RING_BUFFER)
                               .get();

reactor.on($(“topic”), (Event<String> ev) → {
                             System.out.println(“Hello “ + ev.getData());
                  });

reactor.notify(“topic”, Event.wrap(“John Doe”));

RING_BUFFER是Disruptor的RingBuffer操作,熟悉Disruptor的应该知道。

reactor.notify发送一个事件,而reactor.on能够接受到这个事件即时响应。

Reactor 的分发器 Dispatchers 类似Akka的分发器
● 分发器管理任务执行,有下面几种:
– ThreadPoolExecutorDispatcher
● 标准的 ThreadPoolExecutor

– BlockingQueueDispatcher
● 能够进行事件轮询

– RingBufferDispatcher
● LMAX Disruptor RingBuffer

– SynchronousDispatcher


Reactor的 Selectors
● Selectors 是一个等式的左边。
– 一个Selector能够被任何对象使用$(obj)创建
(或者: Selectors.object(obj))
– 一个Selector能够从匹配的key中释放数据
– Predicate<T> Selectors 能够创建匹配特定领域准则
(domain-specific criteria)

比如RegexSelector:
reactor.on(R(“some.(.+)”), (Event<String> ev) → {
// s will be 'topic'
String s = ev.getHeaders().get(“group1”);
});

reactor.notify(“some.topic”, Event.wrap(“John Doe”));

其中R(“some.(.*)”)匹配事件发送者“some.topic”。

UriTemplateSelector能够从URI匹配字符串:
reactor.on(U(“/some/{topic}”), (Event<String> ev) → {
// s will be 'topic'
String s = ev.getHeaders().get(“topic”);
});
reactor.notify(“/some/topic”, Event.wrap(“John Doe”));

Reactor 的Stream
● Streams允许基于数据的函数组合composition 
– Callback++
– 类似Netflix RxJava Observable, JDK 8 Stream

Stream<String> str;
str.map(String::toUpperCase)
     .filter(new Predicate<String>() { public boolean test(String s) { … }
     })
    .consume(s → log.info(“consumed string {}”, s));


Reactor 的 Promise
允许在Stream之间分享函数 

Promise<String> p;
String s = p
        .onSuccess(s → log.info(“consumed string {}”, s))
        .onFailure(t → log.error(t.getMessage(), t))
        .onComplete(t → log.info(“complete”))
        .await(5, SECONDS);

p.map(String::toUpperCase).consume(s → log.info(“UC: {}”, s));


Reactor 的 Processor
干脆直接将Disruptor API转为Reactor API
对于#UberFastData有超级快性能

Processor<Buffer> proc;
Operation<Buffer> op = proc.prepare();
op.get().append(data).flip();
op.commit();
proc.batch(512, buff → buff.append(data).flip());


与Spring整合:
首先使用@EnableReactor 激活reactor
@Configuration
@EnableReactor public class ReactorConfiguration {

  @Bean public Reactor input(Environment env) { return Reactors.reactor().env(env)
                  .dispatcher(RING_BUFFER).get();
   }

   @Bean public Reactor output(Environment env) { return Reactors.reactor().env(env)
                  .dispatcher(RING_BUFFER).get();
}


然后在监听者或观察者写入:
@Component public class SimpleHandler {
    @Autowired private Reactor reactor;

    @Selector(“test.topic”) public void onTestTopic(String s) { // Handle data }
} 


reactor的groovy整合:
@CompileStatic
def welcome(){
    reactor.on('greetings') { String s ->
            reply “hello $s”
            reply “how are you?”
}
reactor.notify 'greetings', 'Jon'
           reactor.send('greetings', 'Stephane'){
                  println it
            cancel()
           }
}

转载于:https://my.oschina.net/pvpCC9IFwqz4/blog/505422

atom react 配置 相关内容

2016-12-14 16:04:23 chenaini119 阅读数 3437

Reactor简介

Reactor是一个基础库,用在构建实时数据流应用、要求有容错和低延迟至毫秒、纳秒、皮秒的服务。

— PrefaceTL;DR

什么是Reactor?

  让我们大致了解一下Reactor。在你使用喜欢的搜索敲入一些关键词如Reactive、spring Reactive、Asynchronous java或者仅仅是”What the heck is Reactor?”.简而言之,Reactor是一个轻量级的JVM基础库,它可以帮助我们构建的服务和应用高效而异步的传递消息。

高效的含义是什么呢?
传递一个消息从A到B时GC产生的内存很小或者完全没有。
当消费者处理消息的速度低于生产者产生消息的速度时产生了溢出时,必须尽快处理。
尽可能的提供无锁的异步流。
  据以往的经验来看,我们知道异步编程是困难的,特别是当一个平台提供了很多选项如JVM。

Reactor瞄准绝大部分场景中真正的无阻塞,并且提供了一组比原生Jdk的java.util.concurrent库更高效的API。Reactor也提供了一个可选性(不建议使用):

  阻塞等待:如Future.get()。

 Unsafe数据获取:如ReentrantLock.lock()。

  异常抛出:如try ..catch …finally

  同步阻塞:如 syschronized

  Wrapper配置(GC压力):例如 new Wrapper(event)

让我们先使用一个纯正的Executor方法:

复制代码

private ExecutorService  threadPool = Executors.newFixedThreadPool(8);

final List<T> batches = new ArrayList<T>();

Callable<T> t = new Callable<T>() {  //1

    public T run() {
        synchronized(batches) {  //2
            T result = callDatabase(msg); //3
            batches.add(result);
            return result;
        }
    }
};

Future<T> f = threadPool.submit(t); //4
T result = f.get()  //5

复制代码
  1.分配回调方法—可能会导致gc压力。

  2.Synchronization将强制对每个线程停止检查。

  3. 存在消费者的消费能力低于生产者生产能力的隐患。

  4. 使用线程池将task传递到目标线程–肯定通过FutureTask给gc造成压力。

  5. 阻塞直至callDatabase()响应。

从上述的简单示例中,容易看出扩展性会受到严重的影响。

  不断分配的对象将导致gc停止工作,特别是耗时比较多的大任务时。当一个gc停止工作时将会从降低全局的性能。

  队列默认情况下长度是不受限制的。任务会堆积到数据库中。

   后台日志不是一个内存泄露的地方,但是副作用就比较烦人了:在gc暂停工作时需要扫描更多对象;损失数据重要bit的风险;等等。

    经典链接Queue分配节点时产生的内存压力。

  使用阻塞方式应答请求时发生恶性循环。

    阻塞方式应答导致生产者效率慢下来。实际上,因为需要提交更多任务时等待响应,流程变成了基本的同步方式。

    同数据存储的通信异常将以不友好的形式传递到生产者,通过线程边界来分离工作,这使容错的协商变的比较容易。

  完全的、真正的非阻塞比较难以实现—特别是有比较时髦名称的分布式系统中如微服务架构。然而,Reactor却没有妥协,它试图利用可用的最佳模式来使开发者不必觉得像是在写一个数学论文而仅仅是一个微服务(nanservice)。

spring reactor 多线程配置
首先搭建spring mybatis项目,前面博客已经有搭建步骤
框架spring reactor,可以帮助我们新开一个异步的线程来处理一些比如记录日志的功能,这样就能节约后台相应的时间。
1:引入jar包,这里使用的是maven,只需要引用一个jar包就行了

<!-- https://mvnrepository.com/artifact/org.projectreactor/reactor-spring -->
        <dependency>
            <groupId>org.projectreactor</groupId>
            <artifactId>reactor-spring</artifactId>
            <version>1.0.1.RELEASE</version>
        </dependency>

2:写一个reactor的配置的bean

package com.baobaotao.reactor;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import reactor.core.Environment;
import reactor.core.Reactor;
import reactor.core.spec.Reactors;
import reactor.spring.context.config.EnableReactor;

@Configuration
@EnableReactor
public class ReactorConfig {

    @Bean(name="rootReactor")
    public Reactor rootReactor(Environment env){
        return Reactors.reactor().env(env).get();
    }

    @Bean(name = "reportReactor")
    public Reactor reportReactor(Environment env) {
        return Reactors.reactor().env(env).get();
    }
}

3:事件的处理类,一般是以Hander结尾,方便区分:

package com.baobaotao.reactor;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;


import reactor.core.Reactor;
import reactor.event.Event;
import reactor.spring.annotation.Selector;

@Component
public class IndexHandler {

    @Autowired
    @Qualifier("rootReactor")
    private Reactor reactor ;
    @Autowired
    @Qualifier("reportReactor")
    private Reactor reactorxx ;


    @Selector(value="hello",reactor="@rootReactor")
    public void handleTestTopic(Event<String> evt)throws Exception{
        System.out.println("************");
    }

    @Selector(value="hellos",reactor="@reportReactor")
    public void handleTestTopics(Event<String> evt)throws Exception{
        System.out.println("xxxxxx**********");
        String data = evt.getData();
        System.out.println(data);
    }

}

4:最后就是在controller或者service里面通知新开线程了:

package com.baobaotao.reactor;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Controller;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import reactor.core.Reactor;
import reactor.event.Event;

@Controller
@RequestMapping("/baobaotao/recator/")
public class IndexController {

    @Autowired
    @Qualifier("rootReactor")
    private Reactor r;

    @Autowired
    @Qualifier("reportReactor")
    private Reactor rx;
    @RequestMapping("/chen")
    @Transactional
    @ResponseBody
    public void chen() {
        r.notify("hello", Event.wrap("你好"));
    }

    @RequestMapping("/chenzy")
    @Transactional
    @ResponseBody
    public void chenzy() {
        rx.notify("hellos", Event.wrap("好"));
    }
}

启动程序请求http://127.0.0.1/baobaotao/recator/chenzy
就可以看到log输出

System.out.println("xxxxxx**********");
        String data = evt.getData();
        System.out.println(data);

atom react 配置 相关内容

2017-10-24 17:19:01 shida_csdn 阅读数 495

  Spring Reactor 默认自动加载 reactor-core jar 文件内的 reactor-environment.properties 配置文件

  


  有时我们需要更改这些默认配置,就需要载入自定义配置文件。


  设置载入 Spring Reactor 自定义配置文件十分简单,主要包含两个关键步骤:


  1. 在实例化 Env 时指定我们要使用的配置文件名称:

  


  2. 在项目resources 资源文件夹内,创建 META-INF/reactor/ 文件夹,并把配置文件放入该文件夹:

    


      保证项目打包后,配置文件位于 jar 包根目录下 META-INF/reactor/ 路径下即可。

atom react 配置 相关内容

2017-11-27 17:34:00 weixin_34409822 阅读数 4

1
2
3
4
5
6
7
8
9
10
11
12
cat /etc/salt/master
file_roots:
  base:
    - /srv/salt/base
  prod:
    - /srv/salt/prod
interface: 192.168.1.100
reactor:
    - 'salt/auth':
        - /srv/reactor/Minion.sls
    - 'salt/minion/Minion/start':
        - /srv/reactor/auto.sls
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
auto.sls
run_state:
    local.state.sls:
        - tgt: {{ data['id'] }}
        - arg:
            - shencan
run_init:
    local.cmd.run:
        - tgt: {{ data['id'] }}
        - arg:
            - echo initsok >>/tmp/cpis
Minion.sls
{% if 'act' in data and data['act'== 'pend' %}
minion_add:
  wheel.key.accept:
    - match: {{ data['id'] }}
{% endif %}
 
shencan.sls  这个位置/srv/salt/base
/tmp/example:
    file.managed:
        - source: salt://example
1
2
3
cat minion
master: 192.168.1.100
id: Minion


客户端启动后自动签发


具体看书


http://www.51niux.com/?id=120  SaltStack系列(五)之各种组件

https://docs.saltstack.com/en/latest/topics/reactor/

1
2
3
4
5
6
7
8
9
10
11
12
更新
file_roots:
  base:
    - /srv/salt/base
  prod:
    - /srv/salt/prod
interface: 192.168.1.100
reactor:
    - 'salt/auth':
        - /srv/reactor/auth.sls
    - 'salt/minion/*/start':
        - /srv/reactor/auto.sls



本文转自 liqius 51CTO博客,原文链接:http://blog.51cto.com/szgb17/1957214,如需转载请自行联系原作者

atom react 配置 相关内容

没有更多推荐了,返回首页