精华内容
下载资源
问答
  • Java多线程 - Disruptor2

    2019-10-03 00:06:50
    Disruptor框架中生产者、消费者的各种复杂依赖场景下的使用总结 具体而言,它可以解决如下方面: - 并行计算实现; - 串行依次执行; - 菱形方式执行; - 链式并行计算。 并且基于以上情况,每种类型的消费者都可以...

    Disruptor框架中生产者、消费者的各种复杂依赖场景下的使用总结

    具体而言,它可以解决如下方面:
    - 并行计算实现;
    - 串行依次执行;
    - 菱形方式执行;
    - 链式并行计算。
    并且基于以上情况,每种类型的消费者都可以池化,默认初始化多个同一类型的消费者实例,并行处理,提高系统吞吐量。

    本样例是一个生产者生产一个Long类型的数值,消费者对该数值进行处理的操作。本样例对以上各种情况的实现只是disruptor注册消费者的方式不同,因此,我们先把事件类、事件工厂类、消费者类、事件转换类和主函数贴出来。

    事件类

    public class LongEvent {
        private Long number;
    
        public Long getNumber() {
            return number;
        }
    
        public void setNumber(Long number) {
            this.number = number;
        }
    }

    事件工厂类

    public class LongEventFactory implements EventFactory<LongEvent> {
        @Override
        public LongEvent newInstance() {
            return new LongEvent();
        }
    }

    事件转换类

    public class LongEventTranslator implements EventTranslatorOneArg<LongEvent, Long> {
        @Override
        public void translateTo(LongEvent event, long sequence, Long arg0) {
            event.setNumber(arg0);
        }
    }

    C1-1消费者类
    该消费者执行将数值+10的操作。可以看到该消费者同时实现了EventHandler和WorkHandler两个接口。如果不需要池化,只需要实现EventHandler类即可。如果需要池化,只需要实现WorkHandler类即可。本例为了能够同时讲解池化和非池化的实现,因此同时实现了两个类,当然,也没啥问题。

    public class C11EventHandler implements EventHandler<LongEvent>,WorkHandler<LongEvent> {
        @Override
        public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
            long number = event.getNumber();
            number += 10;
            System.out.println(System.currentTimeMillis()+": c1-1 consumer finished.number=" + number);
        }
    
        @Override
        public void onEvent(LongEvent event) throws Exception {
            long number = event.getNumber();
            number += 10;
            System.out.println(System.currentTimeMillis()+": c1-1 consumer finished.number=" + number);
        }
    }

    C1-2消费者类
    该消费者类执行将数值乘以10的操作。

    public class C12EventHandler implements EventHandler<LongEvent>,WorkHandler<LongEvent> {
        @Override
        public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
            long number = event.getNumber();
            number *= 10;
            System.out.println(System.currentTimeMillis()+": c1-2 consumer finished.number=" + number);
        }
    
        @Override
        public void onEvent(LongEvent event) throws Exception {
            long number = event.getNumber();
            number *= 10;
            System.out.println(System.currentTimeMillis()+": c1-2 consumer finished.number=" + number);
        }
    }

    c2-1消费者类
    该消费者类负责将数值+20

    public class C21EventHandler implements EventHandler<LongEvent>,WorkHandler<LongEvent> {
        @Override
        public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
            long number = event.getNumber();
            number += 20;
            System.out.println(System.currentTimeMillis()+": c2-1 consumer finished.number=" + number);
        }
    
        @Override
        public void onEvent(LongEvent event) throws Exception {
            long number = event.getNumber();
            number += 20;
            System.out.println(System.currentTimeMillis()+": c2-1 consumer finished.number=" + number);
        }
    }

    C2-2消费者类
    该消费者类负责将数值*20

    public class C22EventHandler implements EventHandler<LongEvent>,WorkHandler<LongEvent> {
        @Override
        public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
            long number = event.getNumber();
            number *= 20;
            System.out.println(System.currentTimeMillis()+": c2-2 consumer finished.number=" + number);
        }
    
        @Override
        public void onEvent(LongEvent event) throws Exception {
            long number = event.getNumber();
            number *= 20;
            System.out.println(System.currentTimeMillis()+": c2-2 consumer finished.number=" + number);
        }
    }

    主函数

    public class Main {
        public static void main(String[] args) {
            int bufferSize = 1024*1024;//环形队列长度,必须是2的N次方
            EventFactory<LongEvent> eventFactory = new LongEventFactory();
            /**
             * 定义Disruptor,基于单生产者,阻塞策略
             */
            Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory,bufferSize, Executors.defaultThreadFactory(),ProducerType.SINGLE,new BlockingWaitStrategy());
            /
            XXX(disruptor);//这里是调用各种不同方法的地方.
            /
            RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
            /**
             * 输入10
             */
            ringBuffer.publishEvent(new LongEventTranslator(),10L);
            ringBuffer.publishEvent(new LongEventTranslator(),100L);
        }
    }

    并行计算实现

    并行计算就是消费者之间互相不依赖,并行执行,执行开始时间是一样的。

    /**
      * 并行计算实现,c1,c2互相不依赖
      * <br/>
      * p --> c11
      *   --> c21
      */
     public static void parallel(Disruptor<LongEvent> disruptor){
         disruptor.handleEventsWith(new C11EventHandler(),new C21EventHandler());
         disruptor.start();
     }

    串行计算,依次执行

    /**
      * 串行依次执行
      * <br/>
      * p --> c11 --> c21
      * @param disruptor
      */
     public static void serial(Disruptor<LongEvent> disruptor){
         disruptor.handleEventsWith(new C11EventHandler()).then(new C21EventHandler());
         disruptor.start();
     }

    菱形方式执行

    /**
      * 菱形方式执行
      * <br/>
      *   --> c11
      * p          --> c21
      *   --> c12
      * @param disruptor
      */
     public static void diamond(Disruptor<LongEvent> disruptor){
         disruptor.handleEventsWith(new C11EventHandler(),new C12EventHandler()).then(new C21EventHandler());
         disruptor.start();
     }

    链式并行计算

    /**
      * 链式并行计算
      * <br/>
      *   --> c11 --> c12
      * p
      *   --> c21 --> c22
      * @param disruptor
      */
     public static void chain(Disruptor<LongEvent> disruptor){
         disruptor.handleEventsWith(new C11EventHandler()).then(new C12EventHandler());
         disruptor.handleEventsWith(new C21EventHandler()).then(new C22EventHandler());
         disruptor.start();
     }

    上面的实例,每一种消费者都只有一个实例,如果想多个实例形成一个线程池并发处理多个任务怎么办?如果使用disruptor.handleEventWith(new C11EventHandler(),new C11EventHandler(),...)这种,会造成重复消费同一个数据,不是我们想要的。我们想要的是同一个类的实例消费不同的数据,怎么办?
    - 首先,消费者类需要实现WorkHandler接口,而不是EventHandler接口。为了方便,我们同时实现了这两个接口。
    - 其次,disruptor调用handleEventsWithWorkerPool方法,而不是handleEventsWith方法
    - 最后,实例化多个事件消费类。

    并行计算实现,c1,c2互相不依赖,同时C1,C2分别有2个实例

    /**
      * 并行计算实现,c1,c2互相不依赖,同时C1,C2分别有2个实例
      * <br/>
      * p --> c11
      *   --> c21
      */
     public static void parallelWithPool(Disruptor<LongEvent> disruptor){
         disruptor.handleEventsWithWorkerPool(new C11EventHandler(),new C11EventHandler());
         disruptor.handleEventsWithWorkerPool(new C21EventHandler(),new C21EventHandler());
         disruptor.start();
     }

    串行依次执行,同时C11,C21分别有2个实例


     

    /**
      * 串行依次执行,同时C11,C21分别有2个实例
       * <br/>
       * p --> c11 --> c21
       * @param disruptor
       */
      public static void serialWithPool(Disruptor<LongEvent> disruptor){
          disruptor.handleEventsWithWorkerPool(new C11EventHandler(),new C11EventHandler()).then(new C21EventHandler(),new C21EventHandler());
          disruptor.start();
      }

     

    展开全文
  • producer:   import java.nio.ByteBuffer; import java.util.UUID; import bhz.base.LongEvent; ...import com.lmax.disruptor.EventTranslatorOneArg; import com.lmax.disruptor.RingBuffer;

    producer:

         

    import java.nio.ByteBuffer;
    import java.util.UUID;


    import bhz.base.LongEvent;


    import com.lmax.disruptor.EventTranslatorOneArg;
    import com.lmax.disruptor.RingBuffer;


    /**
     * <B>系统名称:</B><BR>
     * <B>模块名称:</B><BR>
     * <B>中文类名:</B><BR>
     * <B>概要说明:</B><BR>
     * @author 北京尚学堂(alienware)
     * @since 2015年11月23日
     */
    public class Producer {


    private final RingBuffer<Order> ringBuffer;

    public Producer(RingBuffer<Order> ringBuffer){
    this.ringBuffer = ringBuffer;
    }

    /**
    * onData用来发布事件,每调用一次就发布一次事件
    * 它的参数会用过事件传递给消费者
    */
    public void onData(String data){
    //可以把ringBuffer看做一个事件队列,那么next就是得到下面一个事件槽
    long sequence = ringBuffer.next();
    try {
    //用上面的索引取出一个空的事件用于填充(获取该序号对应的事件对象)
    Order order = ringBuffer.get(sequence);
    //获取要通过事件传递的业务数据
    order.setId(data);
    } finally {
    //发布事件
    //注意,最后的 ringBuffer.publish 方法必须包含在 finally 中以确保必须得到调用;如果某个请求的 sequence 未被提交,将会堵塞后续的发布操作或者其它的 producer。
    ringBuffer.publish(sequence);
    }
    }


    }


    customer:



    import java.util.concurrent.atomic.AtomicInteger;


    import com.lmax.disruptor.WorkHandler;


    public class Consumer implements WorkHandler<Order>{

    private String consumerId;

    private static AtomicInteger count = new AtomicInteger(0);

    public Consumer(String consumerId){
    this.consumerId = consumerId;
    }


    @Override
    public void onEvent(Order order) throws Exception {
    System.out.println("当前消费者: " + this.consumerId + ",消费信息:" + order.getId());
    count.incrementAndGet();
    }

    public int getCount(){
    return count.get();
    }


    }



    Order:



    public class Order {  

    private String id;//ID  
    private String name;
    private double price;//金额  

    public String getId() {
    return id;
    }
    public void setId(String id) {
    this.id = id;
    }
    public String getName() {
    return name;
    }
    public void setName(String name) {
    this.name = name;
    }
    public double getPrice() {
    return price;
    }
    public void setPrice(double price) {
    this.price = price;
    }
     
    }  


    Main:



    import java.nio.ByteBuffer;
    import java.util.UUID;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.Executors;


    import com.lmax.disruptor.EventFactory;
    import com.lmax.disruptor.ExceptionHandler;
    import com.lmax.disruptor.RingBuffer;
    import com.lmax.disruptor.SequenceBarrier;
    import com.lmax.disruptor.WorkHandler;
    import com.lmax.disruptor.WorkerPool;
    import com.lmax.disruptor.YieldingWaitStrategy;
    import com.lmax.disruptor.dsl.ProducerType;


    public class Main {

    public static void main(String[] args) throws Exception {


    //创建ringBuffer
    RingBuffer<Order> ringBuffer = 
    RingBuffer.create(ProducerType.MULTI, 
    new EventFactory<Order>() {  
               @Override  
               public Order newInstance() {  
                   return new Order();  
               }  
           }, 
           1024 * 1024, 
    new YieldingWaitStrategy());

    SequenceBarrier barriers = ringBuffer.newBarrier();

    Consumer[] consumers = new Consumer[3];
    for(int i = 0; i < consumers.length; i++){
    consumers[i] = new Consumer("c" + i);
    }

    WorkerPool<Order> workerPool = 
    new WorkerPool<Order>(ringBuffer, 
    barriers, 
    new IntEventExceptionHandler(),
    consumers);

            ringBuffer.addGatingSequences(workerPool.getWorkerSequences());  
            workerPool.start(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));  
            
            final CountDownLatch latch = new CountDownLatch(1);
            for (int i = 0; i < 100; i++) {  
            final Producer p = new Producer(ringBuffer);
            new Thread(new Runnable() {
    @Override
    public void run() {
    try {
    latch.await();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    for(int j = 0; j < 100; j ++){
    p.onData(UUID.randomUUID().toString());
    }
    }
    }).start();
            } 
            Thread.sleep(2000);
            System.out.println("---------------开始生产-----------------");
            latch.countDown();
            Thread.sleep(5000);
            System.out.println("总数:" + consumers[0].getCount() );
    }

    static class IntEventExceptionHandler implements ExceptionHandler {  
       public void handleEventException(Throwable ex, long sequence, Object event) {}  
       public void handleOnStartException(Throwable ex) {}  
       public void handleOnShutdownException(Throwable ex) {}  

    }


    展开全文
  • Disruptor

    2020-12-09 10:52:00
    背景 Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现...目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。在

    背景

    Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖。

    目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。在美团技术团队它也有不少应用,有的项目架构借鉴了它的设计机制。本文从实战角度剖析了Disruptor的实现原理。

    需要特别指出的是,这里所说的队列是系统内部的内存队列,而不是Kafka这样的分布式队列。另外,本文所描述的Disruptor特性限于3.3.4。

    Java内置队列

    介绍Disruptor之前,我们先来看一看常用的线程安全的内置队列有什么问题。Java的内置队列如下表所示。

    队列 有界性 数据结构
    ArrayBlockingQueue bounded 加锁 arraylist
    LinkedBlockingQueue optionally-bounded 加锁 linkedlist
    ConcurrentLinkedQueue unbounded 无锁 linkedlist
    LinkedTransferQueue unbounded 无锁 linkedlist
    PriorityBlockingQueue unbounded 加锁 heap
    DelayQueue unbounded 加锁 heap

    队列的底层一般分成三种:数组、链表和堆。其中,堆一般情况下是为了实现带有优先级特性的队列,暂且不考虑。

    我们就从数组和链表两种数据结构来看,基于数组线程安全的队列,比较典型的是ArrayBlockingQueue,它主要通过加锁的方式来保证线程安全;基于链表的线程安全队列分成LinkedBlockingQueue和ConcurrentLinkedQueue两大类,前者也通过锁的方式来实现线程安全,而后者以及上面表格中的LinkedTransferQueue都是通过原子变量compare and swap(以下简称“CAS”)这种不加锁的方式来实现的。

    通过不加锁的方式实现的队列都是无界的(无法保证队列的长度在确定的范围内);而加锁的方式,可以实现有界队列。在稳定性要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列;同时,为了减少Java的垃圾回收对系统性能的影响,会尽量选择array/heap格式的数据结构。这样筛选下来,符合条件的队列就只有ArrayBlockingQueue。

    ArrayBlockingQueue的问题

    ArrayBlockingQueue在实际使用过程中,会因为加锁和伪共享等出现严重的性能问题,我们下面来分析一下。

    加锁

    现实编程过程中,加锁通常会严重地影响性能。线程会因为竞争不到锁而被挂起,等锁被释放的时候,线程又会被恢复,这个过程中存在着很大的开销,并且通常会有较长时间的中断,因为当一个线程正在等待锁时,它不能做任何其他事情。如果一个线程在持有锁的情况下被延迟执行,例如发生了缺页错误、调度延迟或者其它类似情况,那么所有需要这个锁的线程都无法执行下去。如果被阻塞线程的优先级较高,而持有锁的线程优先级较低,就会发生优先级反转。

    Disruptor论文中讲述了一个实验:

    • 这个测试程序调用了一个函数,该函数会对一个64位的计数器循环自增5亿次。
    • 机器环境:2.4G 6核
    • 运算: 64位的计数器累加5亿次

    |Method | Time (ms) | |— | —| |Single thread | 300| |Single thread with CAS | 5,700| |Single thread with lock | 10,000| |Single thread with volatile write | 4,700| |Two threads with CAS | 30,000| |Two threads with lock | 224,000|

    CAS操作比单线程无锁慢了1个数量级;有锁且多线程并发的情况下,速度比单线程无锁慢3个数量级。可见无锁速度最快。

    单线程情况下,不加锁的性能 > CAS操作的性能 > 加锁的性能。

    在多线程情况下,为了保证线程安全,必须使用CAS或锁,这种情况下,CAS的性能超过锁的性能,前者大约是后者的8倍。

    综上可知,加锁的性能是最差的。

    关于锁和CAS

    保证线程安全一般分成两种方式:锁和原子变量。

    图1 通过加锁的方式实现线程安全

    图1 通过加锁的方式实现线程安全

    采取加锁的方式,默认线程会冲突,访问数据时,先加上锁再访问,访问之后再解锁。通过锁界定一个临界区,同时只有一个线程进入。如上图所示,Thread2访问Entry的时候,加了锁,Thread1就不能再执行访问Entry的代码,从而保证线程安全。

    下面是ArrayBlockingQueue通过加锁的方式实现的offer方法,保证线程安全。

    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                insert(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
    

    原子变量

    原子变量能够保证原子性的操作,意思是某个任务在执行过程中,要么全部成功,要么全部失败回滚,恢复到执行之前的初态,不存在初态和成功之间的中间状态。例如CAS操作,要么比较并交换成功,要么比较并交换失败。由CPU保证原子性。

    通过原子变量可以实现线程安全。执行某个任务的时候,先假定不会有冲突,若不发生冲突,则直接执行成功;当发生冲突的时候,则执行失败,回滚再重新操作,直到不发生冲突。

    图2 通过原子变量CAS实现线程安全

    图2 通过原子变量CAS实现线程安全

    如图所示,Thread1和Thread2都要把Entry加1。若不加锁,也不使用CAS,有可能Thread1取到了myValue=1,Thread2也取到了myValue=1,然后相加,Entry中的value值为2。这与预期不相符,我们预期的是Entry的值经过两次相加后等于3。

    CAS会先把Entry现在的value跟线程当初读出的值相比较,若相同,则赋值;若不相同,则赋值执行失败。一般会通过while/for循环来重新执行,直到赋值成功。

    代码示例是AtomicInteger的getAndAdd方法。CAS是CPU的一个指令,由CPU保证原子性。

    /**
     * Atomically adds the given value to the current value.
     *
     * @param delta the value to add
     * @return the previous value
     */
    public final int getAndAdd(int delta) {
        for (;;) {
            int current = get();
            int next = current + delta;
            if (compareAndSet(current, next))
                return current;
        }
    }
      
    /**
     * Atomically sets the value to the given updated value
     * if the current value {@code ==} the expected value.
     *
     * @param expect the expected value
     * @param update the new value
     * @return true if successful. False return indicates that
     * the actual value was not equal to the expected value.
     */
    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    } 
    

    在高度竞争的情况下,锁的性能将超过原子变量的性能,但是更真实的竞争情况下,原子变量的性能将超过锁的性能。同时原子变量不会有死锁等活跃性问题。

    伪共享

    什么是共享

    下图是计算的基本结构。L1、L2、L3分别表示一级缓存、二级缓存、三级缓存,越靠近CPU的缓存,速度越快,容量也越小。所以L1缓存很小但很快,并且紧靠着在使用它的CPU内核;L2大一些,也慢一些,并且仍然只能被一个单独的CPU核使用;L3更大、更慢,并且被单个插槽上的所有CPU核共享;最后是主存,由全部插槽上的所有CPU核共享。

    图3 计算机CPU与缓存示意图

    图3 计算机CPU与缓存示意图

    当CPU执行运算的时候,它先去L1查找所需的数据、再去L2、然后是L3,如果最后这些缓存中都没有,所需的数据就要去主内存拿。走得越远,运算耗费的时间就越长。所以如果你在做一些很频繁的事,你要尽量确保数据在L1缓存中。

    另外,线程之间共享一份数据的时候,需要一个线程把数据写回主存,而另一个线程访问主存中相应的数据。

    下面是从CPU访问不同层级数据的时间概念:

    从CPU到 大约需要的CPU周期 大约需要的时间
    主存 - 约60-80ns
    QPI 总线传输(between sockets, not drawn) - 约20ns
    L3 cache 约40-45 cycles 约15ns
    L2 cache 约10 cycles 约3ns
    L1 cache 约3-4 cycles 约1ns
    寄存器 1 cycle -

    可见CPU读取主存中的数据会比从L1中读取慢了近2个数量级。

    缓存行

    Cache是由很多个cache line组成的。每个cache line通常是64字节,并且它有效地引用主内存中的一块儿地址。一个Java的long类型变量是8字节,因此在一个缓存行中可以存8个long类型的变量。

    CPU每次从主存中拉取数据时,会把相邻的数据也存入同一个cache line。

    在访问一个long数组的时候,如果数组中的一个值被加载到缓存中,它会自动加载另外7个。因此你能非常快的遍历这个数组。事实上,你可以非常快速的遍历在连续内存块中分配的任意数据结构。

    下面的例子是测试利用cache line的特性和不利用cache line的特性的效果对比。

    package com.meituan.FalseSharing;
     
    /**
     * @author gongming
     * @description
     * @date 16/6/4
     */
    public class CacheLineEffect {
        //考虑一般缓存行大小是64字节,一个 long 类型占8字节
        static  long[][] arr;
     
        public static void main(String[] args) {
            arr = new long[1024 * 1024][];
            for (int i = 0; i < 1024 * 1024; i++) {
                arr[i] = new long[8];
                for (int j = 0; j < 8; j++) {
                    arr[i][j] = 0L;
                }
            }
            long sum = 0L;
            long marked = System.currentTimeMillis();
            for (int i = 0; i < 1024 * 1024; i+=1) {
                for(int j =0; j< 8;j++){
                    sum = arr[i][j];
                }
            }
            System.out.println("Loop times:" + (System.currentTimeMillis() - marked) + "ms");
     
            marked = System.currentTimeMillis();
            for (int i = 0; i < 8; i+=1) {
                for(int j =0; j< 1024 * 1024;j++){
                    sum = arr[j][i];
                }
            }
            System.out.println("Loop times:" + (System.currentTimeMillis() - marked) + "ms");
        }
    }
    

    在2G Hz、2核、8G内存的运行环境中测试,速度差一倍。

    结果:

    Loop times:30ms Loop times:65ms

    什么是伪共享

    ArrayBlockingQueue有三个成员变量: - takeIndex:需要被取走的元素下标 - putIndex:可被元素插入的位置的下标 - count:队列中元素的数量

    这三个变量很容易放到一个缓存行中,但是之间修改没有太多的关联。所以每次修改,都会使之前缓存的数据失效,从而不能完全达到共享的效果。

    图4 ArrayBlockingQueue伪共享示意图

    图4 ArrayBlockingQueue伪共享示意图

    如上图所示,当生产者线程put一个元素到ArrayBlockingQueue时,putIndex会修改,从而导致消费者线程的缓存中的缓存行无效,需要从主存中重新读取。

    这种无法充分使用缓存行特性的现象,称为伪共享。

    对于伪共享,一般的解决方案是,增大数组元素的间隔使得由不同线程存取的元素位于不同的缓存行上,以空间换时间。

    package com.meituan.FalseSharing;
     
    public class FalseSharing implements Runnable{
            public final static long ITERATIONS = 500L * 1000L * 100L;
            private int arrayIndex = 0;
     
            private static ValuePadding[] longs;
            public FalseSharing(final int arrayIndex) {
                this.arrayIndex = arrayIndex;
            }
     
            public static void main(final String[] args) throws Exception {
                for(int i=1;i<10;i++){
                    System.gc();
                    final long start = System.currentTimeMillis();
                    runTest(i);
                    System.out.println("Thread num "+i+" duration = " + (System.currentTimeMillis() - start));
                }
     
            }
     
            private static void runTest(int NUM_THREADS) throws InterruptedException {
                Thread[] threads = new Thread[NUM_THREADS];
                longs = new ValuePadding[NUM_THREADS];
                for (int i = 0; i < longs.length; i++) {
                    longs[i] = new ValuePadding();
                }
                for (int i = 0; i < threads.length; i++) {
                    threads[i] = new Thread(new FalseSharing(i));
                }
     
                for (Thread t : threads) {
                    t.start();
                }
     
                for (Thread t : threads) {
                    t.join();
                }
            }
     
            public void run() {
                long i = ITERATIONS + 1;
                while (0 != --i) {
                    longs[arrayIndex].value = 0L;
                }
            }
     
            public final static class ValuePadding {
                protected long p1, p2, p3, p4, p5, p6, p7;
                protected volatile long value = 0L;
                protected long p9, p10, p11, p12, p13, p14;
                protected long p15;
            }
            public final static class ValueNoPadding {
                // protected long p1, p2, p3, p4, p5, p6, p7;
                protected volatile long value = 0L;
                // protected long p9, p10, p11, p12, p13, p14, p15;
            }
    }
    

    在2G Hz,2核,8G内存, jdk 1.7.0_45 的运行环境下,使用了共享机制比没有使用共享机制,速度快了4倍左右。

    结果:

    • Thread num 1 duration = 447
    • Thread num 2 duration = 463
    • Thread num 3 duration = 454
    • Thread num 4 duration = 464
    • Thread num 5 duration = 561
    • Thread num 6 duration = 606
    • Thread num 7 duration = 684
    • Thread num 8 duration = 870
    • Thread num 9 duration = 823

    把代码中ValuePadding都替换为ValueNoPadding后的结果:

    • Thread num 1 duration = 446
    • Thread num 2 duration = 2549
    • Thread num 3 duration = 2898
    • Thread num 4 duration = 3931
    • Thread num 5 duration = 4716
    • Thread num 6 duration = 5424
    • Thread num 7 duration = 4868
    • Thread num 8 duration = 4595
    • Thread num 9 duration = 4540

    备注:在jdk1.8中,有专门的注解@Contended来避免伪共享,更优雅地解决问题。

    Disruptor的设计方案

    Disruptor通过以下设计来解决队列速度慢的问题:

    • 环形数组结构

    为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好。

    • 元素位置定位

    数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心index溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。

    • 无锁设计

    每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。

    下面忽略数组的环形结构,介绍一下如何实现无锁设计。整个过程通过原子变量CAS,保证操作的线程安全。

    一个生产者

    写数据

    生产者单线程写数据的流程比较简单:

    1. 申请写入m个元素;
    2. 若是有m个元素可以入,则返回最大的序列号。这儿主要判断是否会覆盖未读的元素;
    3. 若是返回的正确,则生产者开始写入元素。

    图5 单个生产者生产过程示意图

    图5 单个生产者生产过程示意图

    多个生产者

    多个生产者的情况下,会遇到“如何防止多个线程重复写同一个元素”的问题。Disruptor的解决方法是,每个线程获取不同的一段数组空间进行操作。这个通过CAS很容易达到。只需要在分配元素的时候,通过CAS判断一下这段空间是否已经分配出去即可。

    但是会遇到一个新问题:如何防止读取的时候,读到还未写的元素。Disruptor在多个生产者的情况下,引入了一个与Ring Buffer大小相同的buffer:available Buffer。当某个位置写入成功的时候,便把availble Buffer相应的位置置位,标记为写入成功。读取的时候,会遍历available Buffer,来判断元素是否已经就绪。

    下面分读数据和写数据两种情况介绍。

    读数据

    生产者多线程写入的情况会复杂很多:

    1. 申请读取到序号n;
    2. 若writer cursor >= n,这时仍然无法确定连续可读的最大下标。从reader cursor开始读取available Buffer,一直查到第一个不可用的元素,然后返回最大连续可读元素的位置;
    3. 消费者读取元素。

    如下图所示,读线程读到下标为2的元素,三个线程Writer1/Writer2/Writer3正在向RingBuffer相应位置写数据,写线程被分配到的最大元素下标是11。

    读线程申请读取到下标从3到11的元素,判断writer cursor>=11。然后开始读取availableBuffer,从3开始,往后读取,发现下标为7的元素没有生产成功,于是WaitFor(11)返回6。

    然后,消费者读取下标从3到6共计4个元素。

    图6 多个生产者情况下,消费者消费过程示意图

    图6 多个生产者情况下,消费者消费过程示意图

    写数据

    多个生产者写入的时候:

    1. 申请写入m个元素;
    2. 若是有m个元素可以写入,则返回最大的序列号。每个生产者会被分配一段独享的空间;
    3. 生产者写入元素,写入元素的同时设置available Buffer里面相应的位置,以标记自己哪些位置是已经写入成功的。

    如下图所示,Writer1和Writer2两个线程写入数组,都申请可写的数组空间。Writer1被分配了下标3到下表5的空间,Writer2被分配了下标6到下标9的空间。

    Writer1写入下标3位置的元素,同时把available Buffer相应位置置位,标记已经写入成功,往后移一位,开始写下标4位置的元素。Writer2同样的方式。最终都写入完成。

    图7 多个生产者情况下,生产者生产过程示意图

    图7 多个生产者情况下,生产者生产过程示意图

    防止不同生产者对同一段空间写入的代码,如下所示:

    public long tryNext(int n) throws InsufficientCapacityException
    {
        if (n < 1)
        {
            throw new IllegalArgumentException("n must be > 0");
        }
     
        long current;
        long next;
     
        do
        {
            current = cursor.get();
            next = current + n;
     
            if (!hasAvailableCapacity(gatingSequences, n, current))
            {
                throw InsufficientCapacityException.INSTANCE;
            }
        }
        while (!cursor.compareAndSet(current, next));
     
        return next;
    }
    

    通过do/while循环的条件cursor.compareAndSet(current, next),来判断每次申请的空间是否已经被其他生产者占据。假如已经被占据,该函数会返回失败,While循环重新执行,申请写入空间。

    消费者的流程与生产者非常类似,这儿就不多描述了。

    总结

    Disruptor通过精巧的无锁设计实现了在高并发情形下的高性能。

    在美团内部,很多高并发场景借鉴了Disruptor的设计,减少竞争的强度。其设计思想可以扩展到分布式场景,通过无锁设计,来提升服务性能。

    使用Disruptor比使用ArrayBlockingQueue略微复杂,为方便读者上手,增加代码样例。

    代码实现的功能:每10ms向disruptor中插入一个元素,消费者读取数据,并打印到终端。详细逻辑请细读代码。

    以下代码基于3.3.4版本的Disruptor包。

    package com.meituan.Disruptor;
    
    /**
     * @description disruptor代码样例。每10ms向disruptor中插入一个元素,消费者读取数据,并打印到终端
     */
    import com.lmax.disruptor.*;
    import com.lmax.disruptor.dsl.Disruptor;
    import com.lmax.disruptor.dsl.ProducerType;
    
    import java.util.concurrent.ThreadFactory;
    
    
    public class DisruptorMain
    {
        public static void main(String[] args) throws Exception
        {
            // 队列中的元素
            class Element {
    
                private int value;
    
                public int get(){
                    return value;
                }
    
                public void set(int value){
                    this.value= value;
                }
    
            }
    
            // 生产者的线程工厂
            ThreadFactory threadFactory = new ThreadFactory(){
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "simpleThread");
                }
            };
    
            // RingBuffer生产工厂,初始化RingBuffer的时候使用
            EventFactory<Element> factory = new EventFactory<Element>() {
                @Override
                public Element newInstance() {
                    return new Element();
                }
            };
    
            // 处理Event的handler
            EventHandler<Element> handler = new EventHandler<Element>(){
                @Override
                public void onEvent(Element element, long sequence, boolean endOfBatch)
                {
                    System.out.println("Element: " + element.get());
                }
            };
    
            // 阻塞策略
            BlockingWaitStrategy strategy = new BlockingWaitStrategy();
    
            // 指定RingBuffer的大小
            int bufferSize = 16;
    
            // 创建disruptor,采用单生产者模式
            Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);
    
            // 设置EventHandler
            disruptor.handleEventsWith(handler);
    
            // 启动disruptor的线程
            disruptor.start();
    
            RingBuffer<Element> ringBuffer = disruptor.getRingBuffer();
    
            for (int l = 0; true; l++)
            {
                // 获取下一个可用位置的下标
                long sequence = ringBuffer.next();  
                try
                {
                    // 返回可用位置的元素
                    Element event = ringBuffer.get(sequence); 
                    // 设置该位置元素的值
                    event.set(l); 
                }
                finally
                {
                    ringBuffer.publish(sequence);
                }
                Thread.sleep(10);
            }
        }
    }
    

    性能

    以下面这些模式测试性能:

    吞吐量测试数据(每秒的数量)如下。

    环境: - CPU:Intel Core i7 860 @ 2.8 GHz without HT - JVM:Java 1.6.0_25 64-bit - OS:Windows 7

    - ABQ Disruptor
    Unicast: 1P – 1C 5,339,256 25,998,336
    Pipeline: 1P – 3C 2,128,918 16,806,157
    Sequencer: 3P – 1C 5,539,531 13,403,268
    Multicast: 1P – 3C 1,077,384 9,377,871
    Diamond: 1P – 3C 2,113,941 16,143,613

    环境:

    • CPU:Intel Core i7-2720QM
    • JVM:Java 1.6.0_25 64-bit
    • OS:Ubuntu 11.04
    - ABQ Disruptor
    Unicast: 1P – 1C 4,057,453 22,381,378
    Pipeline: 1P – 3C 2,006,903 15,857,913
    Sequencer: 3P – 1C 2,056,118 14,540,519
    Multicast: 1P – 3C 260,733 10,860,121
    Diamond: 1P – 3C 2,082,725 15,295,197

    依据并发竞争的激烈程度的不同,Disruptor比ArrayBlockingQueue吞吐量快4~7倍。

    按照Pipeline: 1P – 3C的连接模式测试延迟,生产者两次写入之间的延迟为1ms。

    运行环境:

    • CPU:2.2GHz Core i7-2720QM
    • Java: 1.6.0_25 64-bit
    • OS:Ubuntu 11.04.
    - Array Blocking Queue (ns) Disruptor (ns)
    99% observations less than 2,097,152 128
    99.99% observations less than 4,194,304 8,192
    Max Latency 5,069,086 175,567
    Mean Latency 32,757 52
    Min Latency 145 29

    可见,平均延迟差了3个数量级。

    等待策略

    生产者的等待策略

    暂时只有休眠1ns。

    LockSupport.parkNanos(1);
    

    消费者的等待策略

    名称 措施 适用场景
    BlockingWaitStrategy 加锁 CPU资源紧缺,吞吐量和延迟并不重要的场景
    BusySpinWaitStrategy 自旋 通过不断重试,减少切换线程导致的系统调用,而降低延迟。推荐在线程绑定到固定的CPU的场景下使用
    PhasedBackoffWaitStrategy 自旋 + yield + 自定义策略 CPU资源紧缺,吞吐量和延迟并不重要的场景
    SleepingWaitStrategy 自旋 + yield + sleep 性能和CPU资源之间有很好的折中。延迟不均匀
    TimeoutBlockingWaitStrategy 加锁,有超时限制 CPU资源紧缺,吞吐量和延迟并不重要的场景
    YieldingWaitStrategy 自旋 + yield + 自旋 性能和CPU资源之间有很好的折中。延迟比较均匀

    Log4j 2应用场景

    Log4j 2相对于Log4j 1最大的优势在于多线程并发场景下性能更优。该特性源自于Log4j 2的异步模式采用了Disruptor来处理。 在Log4j 2的配置文件中可以配置WaitStrategy,默认是Timeout策略。下面是Log4j 2中对WaitStrategy的配置官方文档:

    System Property Default Value Description
    AsyncLogger.WaitStrategy Timeout Valid values: Block, Timeout, Sleep, Yield. Block is a strategy that uses a lock and condition variable for the I/O thread waiting for log events. Block can be used when throughput and low-latency are not as important as CPU resource. Recommended for resource constrained/virtualised environments. Timeout is a variation of the Block strategy that will periodically wake up from the lock condition await() call. This ensures that if a notification is missed somehow the consumer thread is not stuck but will recover with a small latency delay (default 10ms). Sleep is a strategy that initially spins, then uses a Thread.yield(), and eventually parks for the minimum number of nanos the OS and JVM will allow while the I/O thread is waiting for log events. Sleep is a good compromise between performance and CPU resource. This strategy has very low impact on the application thread, in exchange for some additional latency for actually getting the message logged. Yield is a strategy that uses a Thread.yield() for waiting for log events after an initially spinning. Yield is a good compromise between performance and CPU resource, but may use more CPU than Sleep in order to get the message logged to disk sooner.

    性能差异

    loggers all async采用的是Disruptor,而Async Appender采用的是ArrayBlockingQueue队列。

    由图可见,单线程情况下,loggers all async与Async Appender吞吐量相差不大,但是在64个线程的时候,loggers all async的吞吐量比Async Appender增加了12倍,是Sync模式的68倍。

    图8 Log4j 2各个模式性能比较

    图8 Log4j 2各个模式性能比较

    美团在公司内部统一推行日志接入规范,要求必须使用Log4j 2,使普通单机QPS的上限不再只停留在几千,极高地提升了服务性能。

    参考文档

    1. http://brokendreams.iteye.com/blog/2255720
    2. http://ifeve.com/dissecting-disruptor-whats-so-special/
    3. https://github.com/LMAX-Exchange/disruptor/wiki/Performance-Results
    4. https://lmax-exchange.github.io/disruptor/
    5. https://logging.apache.org/log4j/2.x/manual/async.html
    展开全文
  • disruptor

    2017-12-01 18:06:08
     经过2个月的疯狂加班后终于迎来了五一节前的几天清闲日子,在这闲得蛋疼的日子里,本屌丝无意中从ifeve上看到“Disruptor”并发框架,它号称"能够在一个线程里每秒处理6百万订单" 当时就被它吸引住了,不管是不是...
    
    

            经过2个月的疯狂加班后终于迎来了五一节前的几天清闲日子,在这闲得蛋疼的日子里,本屌丝无意中从ifeve上看到“Disruptor”并发框架,它号称"能够在一个线程里每秒处理6百万订单" 当时就被它吸引住了,不管是不是在吹流逼吧,先研究研究在说....

     

    扫盲:

    要想了解Disruptor框架必需多花点时间研究下它的工作原理,不然代码是没法撸的!!!

    关于Disruptor的详细资料及原理请细看!!!    http://ifeve.com/disruptor/

     

    Disruptor版本: 3.2.1

     

    名词解释

    消费者==事件处理器

     

     

    一、现在你必须明白以下问题:

    1、你必须明白Ringbuffer是什么,它的数据结构是怎么样的,有什么约定,为什么高效,它的职责是什么。

    2、ConsumerBarrier (ifeve网上的译文版本比较早,这个类在2.0.0之后就一直被改名,3.2.1的版本中它是SequenceBarrier)它的职责是什么。

     

    Disruptor框架在2.0版本之后不再采用生产者、消费者模型来编写API,而是使用事件模型,其实只是接口设计、类名和概念上的变化,内部原理其实还是一样的。

     

    DEMO 一、使用原生API创建一个简单的生产者和消费者

     

    Java代码  收藏代码
    1. //DEMO中使用的 消息全假定是一条交易  
    2. public class TradeTransaction {  
    3.     private String id;//交易ID  
    4.     private double price;//交易金额  
    5.       
    6.     public TradeTransaction() {  
    7.     }  
    8.     public TradeTransaction(String id, double price) {  
    9.         super();  
    10.         this.id = id;  
    11.         this.price = price;  
    12.     }  
    13.     public String getId() {  
    14.         return id;  
    15.     }  
    16.     public void setId(String id) {  
    17.         this.id = id;  
    18.     }  
    19.     public double getPrice() {  
    20.         return price;  
    21.     }  
    22.     public void setPrice(double price) {  
    23.         this.price = price;  
    24.     }  
    25. }  
    26.   
    27. public class TradeTransactionInDBHandler implements EventHandler<TradeTransaction>,WorkHandler<TradeTransaction> {  
    28.   
    29.     @Override  
    30.     public void onEvent(TradeTransaction event, long sequence,  
    31.             boolean endOfBatch) throws Exception {  
    32.         this.onEvent(event);  
    33.     }  
    34.   
    35.     @Override  
    36.     public void onEvent(TradeTransaction event) throws Exception {  
    37.         //这里做具体的消费逻辑  
    38.         event.setId(UUID.randomUUID().toString());//简单生成下ID  
    39.         System.out.println(event.getId());  
    40.     }  
    41. }  
    42.   
    43.   
    44.   
    45. public class Demo1 {  
    46.     public static void main(String[] args) throws InterruptedException, ExecutionException {  
    47.         int BUFFER_SIZE=1024;  
    48.         int THREAD_NUMBERS=4;  
    49.         /* 
    50.          * createSingleProducer创建一个单生产者的RingBuffer, 
    51.          * 第一个参数叫EventFactory,从名字上理解就是“事件工厂”,其实它的职责就是产生数据填充RingBuffer的区块。 
    52.          * 第二个参数是RingBuffer的大小,它必须是2的指数倍 目的是为了将求模运算转为&运算提高效率 
    53.          * 第三个参数是RingBuffer的生产都在没有可用区块的时候(可能是消费者(或者说是事件处理器) 太慢了)的等待策略 
    54.          */  
    55.         final RingBuffer<TradeTransaction> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<TradeTransaction>() {  
    56.             @Override  
    57.             public TradeTransaction newInstance() {  
    58.                 return new TradeTransaction();  
    59.             }  
    60.         }, BUFFER_SIZE,new YieldingWaitStrategy());  
    61.         //创建线程池  
    62.         ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS);  
    63.         //创建SequenceBarrier  
    64.         SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();  
    65.           
    66.         //创建消息处理器  
    67.         BatchEventProcessor<TradeTransaction> transProcessor = new BatchEventProcessor<TradeTransaction>(  
    68.                 ringBuffer, sequenceBarrier, new TradeTransactionInDBHandler());  
    69.           
    70.         //这一部的目的是让RingBuffer根据消费者的状态    如果只有一个消费者的情况可以省略  
    71.         ringBuffer.addGatingSequences(transProcessor.getSequence());  
    72.           
    73.         //把消息处理器提交到线程池  
    74.         executors.submit(transProcessor);  
    75.         //如果存大多个消费者 那重复执行上面3行代码 把TradeTransactionInDBHandler换成其它消费者类  
    76.           
    77.         Future<?> future=executors.submit(new Callable<Void>() {  
    78.             @Override  
    79.             public Void call() throws Exception {  
    80.                 long seq;  
    81.                 for(int i=0;i<1000;i++){  
    82.                     seq=ringBuffer.next();//占个坑 --ringBuffer一个可用区块  
    83.                       
    84.                     ringBuffer.get(seq).setPrice(Math.random()*9999);//给这个区块放入 数据  如果此处不理解,想想RingBuffer的结构图  
    85.                       
    86.                     ringBuffer.publish(seq);//发布这个区块的数据使handler(consumer)可见  
    87.                 }  
    88.                 return null;  
    89.             }  
    90.         });  
    91.         future.get();//等待生产者结束  
    92.         Thread.sleep(1000);//等上1秒,等消费都处理完成  
    93.         transProcessor.halt();//通知事件(或者说消息)处理器 可以结束了(并不是马上结束!!!)  
    94.         executors.shutdown();//终止线程  
    95.     }  
    96. }  

     

     DEMO二、使用WorkerPool辅助创建消费者

    Java代码  收藏代码
    1. public class Demo2 {  
    2.     public static void main(String[] args) throws InterruptedException {  
    3.         int BUFFER_SIZE=1024;  
    4.         int THREAD_NUMBERS=4;  
    5.         EventFactory<TradeTransaction> eventFactory=new EventFactory<TradeTransaction>() {  
    6.             public TradeTransaction newInstance() {  
    7.                 return new TradeTransaction();  
    8.             }  
    9.         };  
    10.         RingBuffer<TradeTransaction> ringBuffer=RingBuffer.createSingleProducer(eventFactory, BUFFER_SIZE);  
    11.           
    12.         SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();  
    13.           
    14.         ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUMBERS);  
    15.           
    16.         WorkHandler<TradeTransaction> workHandlers=new TradeTransactionInDBHandler();  
    17.         /* 
    18.          * 这个类代码很简单的,亲自己看哈!~ 
    19.          */  
    20.         WorkerPool<TradeTransaction> workerPool=new WorkerPool<TradeTransaction>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), workHandlers);  
    21.           
    22.         workerPool.start(executor);  
    23.           
    24.         //下面这个生产8个数据,图简单就写到主线程算了  
    25.         for(int i=0;i<8;i++){  
    26.             long seq=ringBuffer.next();  
    27.             ringBuffer.get(seq).setPrice(Math.random()*9999);  
    28.             ringBuffer.publish(seq);  
    29.         }  
    30.           
    31.         Thread.sleep(1000);  
    32.         workerPool.halt();  
    33.         executor.shutdown();  
    34.     }  
    35. }  

     DEMO三、demo3写个流弊点的像下图这样。这次用Disruptor来完成整个构建工作.

    从中图可以看出需求是介样子的:生产者生产数据经过C1,C2处理完成后再到C3。

    假设如下场景:

    1、交易网关收到交易(P1)把交易数据发到RingBuffer中,

    2、负责处理增值业务的消费者C1和负责数据存储的消费者C2负责处理交易

    3、负责发送JMS消息的消费者C3在C1和C2处理完成后再进行处理。

     

    让代码说话:

    Java代码  收藏代码
    1. public class TradeTransactionJMSNotifyHandler implements EventHandler<TradeTransaction> {  
    2.   
    3.     @Override  
    4.     public void onEvent(TradeTransaction event, long sequence,  
    5.             boolean endOfBatch) throws Exception {  
    6.         //do send jms message  
    7.     }  
    8. }  
    9.   
    10.   
    11.   
    12. public class TradeTransactionPublisher implements Runnable{  
    13.     Disruptor<TradeTransaction> disruptor;  
    14.     private CountDownLatch latch;  
    15.     private static int LOOP=10000000;//模拟一千万次交易的发生  
    16.   
    17.     public TradeTransactionPublisher(CountDownLatch latch,Disruptor<TradeTransaction> disruptor) {  
    18.         this.disruptor=disruptor;  
    19.         this.latch=latch;  
    20.     }  
    21.   
    22.     @Override  
    23.     public void run() {  
    24.         TradeTransactionEventTranslator tradeTransloator=new TradeTransactionEventTranslator();  
    25.         for(int i=0;i<LOOP;i++){  
    26.             disruptor.publishEvent(tradeTransloator);  
    27.         }  
    28.         latch.countDown();  
    29.     }  
    30.       
    31. }  
    32.   
    33. class TradeTransactionEventTranslator implements EventTranslator<TradeTransaction>{  
    34.     private Random random=new Random();  
    35.     @Override  
    36.     public void translateTo(TradeTransaction event, long sequence) {  
    37.         this.generateTradeTransaction(event);  
    38.     }  
    39.     private TradeTransaction generateTradeTransaction(TradeTransaction trade){  
    40.         trade.setPrice(random.nextDouble()*9999);  
    41.         return trade;  
    42.     }  
    43. }  
    44.   
    45.   
    46. public class TradeTransactionVasConsumer implements EventHandler<TradeTransaction> {  
    47.   
    48.     @Override  
    49.     public void onEvent(TradeTransaction event, long sequence,  
    50.             boolean endOfBatch) throws Exception {  
    51.         //do something....  
    52.     }  
    53.       
    54. }  
    55.   
    56.   
    57.   
    58. public class Demo3 {  
    59.     public static void main(String[] args) throws InterruptedException {  
    60.         long beginTime=System.currentTimeMillis();  
    61.           
    62.         int bufferSize=1024;  
    63.         ExecutorService executor=Executors.newFixedThreadPool(4);  
    64.         //这个构造函数参数,相信你在了解上面2个demo之后就看下就明白了,不解释了~  
    65.         Disruptor<TradeTransaction> disruptor=new Disruptor<TradeTransaction>(new EventFactory<TradeTransaction>() {  
    66.             @Override  
    67.             public TradeTransaction newInstance() {  
    68.                 return new TradeTransaction();  
    69.             }  
    70.         }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy());  
    71.           
    72.         //使用disruptor创建消费者组C1,C2  
    73.         EventHandlerGroup<TradeTransaction> handlerGroup=disruptor.handleEventsWith(new TradeTransactionVasConsumer(),new TradeTransactionInDBHandler());  
    74.           
    75.         TradeTransactionJMSNotifyHandler jmsConsumer=new TradeTransactionJMSNotifyHandler();  
    76.         //声明在C1,C2完事之后执行JMS消息发送操作 也就是流程走到C3  
    77.         handlerGroup.then(jmsConsumer);  
    78.           
    79.           
    80.         disruptor.start();//启动  
    81.         CountDownLatch latch=new CountDownLatch(1);  
    82.         //生产者准备  
    83.         executor.submit(new TradeTransactionPublisher(latch, disruptor));  
    84.         latch.await();//等待生产者完事.  
    85.         disruptor.shutdown();  
    86.         executor.shutdown();  
    87.           
    88.         System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime));  
    89.     }  
    90. }  
    展开全文
  • Disruptor系列2Disruptor原理剖析

    千次阅读 2018-05-21 08:52:33
    都说Disruptor是高性能、低延迟的内存队列,每秒可以处理600W的订单,但是它为什么这么快呢?这就需要我们从他的底层设计原理开始剖析。我觉得,学习了他的实现原理,对自身了解Java并发内存结构是有很大的好处的,...
  • 1. Disruptor简单介绍 Disruptor是一个由LMAX开源的Java并发框架。LMAX是一种新型零售金融交易平台,这个系统是建立在 JVM 平台上,核心是一个业务逻辑处理器,它能够在一个线程里每秒处理 6 百万订单。业务逻辑...
  • disruptor 介绍

    万次阅读 多人点赞 2018-07-31 14:38:56
    一、背景 1.来源 Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内部的内存队列的延迟问题,而...据目前资料显示:应用Disruptor的知名项目有如下的一些:Storm, Camel, Log4j2,还...
  • [size=medium]Disruptor类是启动类.我们可以通过类似这样构造一个Disruptor对象 [code="java"]ExecutorService exec = ...Disruptor disruptor = new Disruptor(ValueEvent.EVENT_FACTORY, 2, exec);[/...
  • Disruptor2

    2018-11-29 17:12:28
    4. 启动Disruptor,原子类型判断启动状态,ConsumerRepository实现了Iterable接口,则遍历的是consumerInfos集合 public RingBuffer<T> start() { checkOnlyStartedOnce(); for (final ConsumerInfo ...
  • Disruptor系列3:Disruptor样例实战

    千次阅读 2018-05-21 22:10:57
    - Disruptor系列2Disruptor原理剖析 本章节是Disruptor样例实战,依据Disruptor的工作流依次执行的特性,实现各种样例。如果想了解Disruptor是什么,可以查看章节 Disruptor系列1:初识Disruptor ,如果想深层次...
  • Disruptor使用

    2019-09-23 20:31:02
    Disruptor作者,介绍Disruptor能每秒处理600万订单。这是一个可怕的数字。 disruptor之所以那么快,是因为内部采用环形队列和无锁设计。... 1.Disruptor:用于控制整个消费者-生产者模型的处理器 2.Ri...
  • 了解Disruptor

    2020-03-01 17:28:55
    了解Disruptor Disruptor是一个单机版最高性能消息队列,最早由金融行业中发明,它的GitHub地址: ...本文主要对GitHub上的英文介绍进行简单翻译记录。...Michael Barker edited this page on 2 Mar 2015 · 8 ...
  • disruptor入门

    2021-04-12 10:36:43
    1 disruptor 是什么? Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/...目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了D
  • disruptor3 我决定对整个版本的Disruptor... 在发布此版本时,我最初走了几个阴暗的小巷,但与2.x版本的区别不大,但是又带来了一些好处。 我本来想实施一些功能测试并改善文档,但是我可能会一直等到那些方面,直...
  • Disruptor简介

    2018-04-08 16:44:50
    一.介绍Disruptor是java的并发框架,实现了无锁的队列,应用场景是“生产者-消费者”或“发布-订阅...根据消费者的依赖关系图,多播事件给多个消费者2.为事件提前分配内存3.可选择无锁二.重要概念Disruptor里面有一...
  • disruptor 核心链路的高级操作 disruptor 控制核心链路十分方便,并且可以串行,并行,菱形,多边形链路操作。 启动类 package com.tkn.disruptor.heigh.chain; import com.lmax.disruptor.BusySpinWaitStrategy; ...
  • 从log4j2Disruptor

    千次阅读 2019-02-22 15:44:00
    log4j2实现原理可查看:https://blog.csdn.net/u010597819/article/details/86646261 文章同样基于log4j-2.7版本,disruptor-3.3.6 相信看过log4j2的源码后大家应该...下面我们就跟着log4j2来走进Disruptor这个神...
  • Disruptor start

    2016-09-03 11:02:57
    1.Disruptor下载地址 https://github.com/LMAX-Exchange/disruptor   2.BlockingQueue http://wsmajunfeng.iteye.com/blog/1629354   2.并发编程网 http://ifeve.com/ali-think-3/#more-28323
  • 项目使用了log4j2,由于使用了全局异步打印日志的方式,还需要引入disruptor的依赖,最后使用的log4j2disruptor的版本依赖如下: &lt;dependency&gt; &lt;groupId&gt;org.apache.logging.log4j&...
  • Log4j2 LMAX disruptor

    2020-11-20 17:27:40
    LMAX disruptor exchanging data between concurrent threads the queuing of events between stages in the pipeline was dominating the costs. However it became evident that queues as a fundamental data ...
  • 2、使用该disruptor改进的线程池ThreadPool头文件。 因为是模板类头文件,可以直接部署到项目中 include: 1、two disruptor related headfile; 2、a ThreadPool with this disruptor for its buffer. blog:
  • 开篇  整个博文希望能够讲清楚Disruptor的producer和... Disruptor本质上是一个内存消息队列,适合生产者消费者模型,所以它的整个工作过程其实也就分三个大过程,分别是Disruptor本身启动、Disruptor生产者工作过...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 555
精华内容 222
关键字:

disruptor2