精华内容
下载资源
问答
  • Disruptor使用

    2019-09-23 20:31:02
    Disruptor作者,介绍Disruptor能每秒处理...使用cas来进行并发控制。通过获取可用下标来对事件发布和消费 下标通过cas控制(Atomic) disruptor组成部分 1.Disruptor:用于控制整个消费者-生产者模型的处理器 2.Ri...

    Disruptor作者,介绍Disruptor能每秒处理600万订单。这是一个可怕的数字。

    disruptor之所以那么快,是因为内部采用环形队列和无锁设计。使用cas来进行并发控制。通过获取可用下标来对事件发布和消费

    下标通过cas控制(Atomic)

    disruptor组成部分

              1.Disruptor:用于控制整个消费者-生产者模型的处理器 
       2.RingBuffer:用于存放数据 
       3.EventHandler:一个用于处理事件的接口(可以当做生产者,也可以当做消费者)。 
       4.EventFactory:事件工厂类。 
       5.WaitStrategy:用于实现事件处理等待RingBuffer游标策略的接口。 
       6.SequeueBarrier:队列屏障,用于处理访问RingBuffer的序列。 
       7.用于运行disruptor的线程或者线程池。

    Disruptor简单使用

    1.创建订单和订单事件

    package com.liqang.test;
    
    import java.math.BigDecimal;
    /**
     * 简单模拟一个订单
     * @author Administrator
     *
     */
    public class Order {
        private int id;
        private BigDecimal price;
        private double num;
        private int pid;
        public int getId() {
            return id;
        }
        public void setId(int id) {
            this.id = id;
        }
        public BigDecimal getPrice() {
            return price;
        }
        public void setPrice(BigDecimal price) {
            this.price = price;
        }
        public double getNum() {
            return num;
        }
        public void setNum(double num) {
            this.num = num;
        }
        public int getPid() {
            return pid;
        }
        public void setPid(int pid) {
            this.pid = pid;
        }
    }
    package com.liqang.test;
    
    import java.math.BigDecimal;
    //订单事件  disruptor容器都是以事件对存在
    public class OrderEvent {
    
        private Order order;
    
        public Order getOrder() {
            return order;
        }
    
        public void setOrder(Order order) {
            this.order = order;
        }
        
    }

    2.创建disruptor事件工厂

    package bhz.base;
    
    import com.lmax.disruptor.EventFactory;
    // 需要让disruptor为我们创建事件,我们同时还声明了一个EventFactory来实例化Event对象。
    public class LongEventFactory implements EventFactory { 
    
        @Override 
        public Object newInstance() { 
            return new LongEvent(); 
        } 
    } 

    disruptor会调用工厂方法为我们创建事件。并放到对应的事件槽里面

    3.创建事件消费者处理类

    /**
     * 事件消费者
     * @author Administrator
     *
     */
    public class OrderEventHandle implements EventHandler<OrderEvent>{
    
        @Override
        public void onEvent(OrderEvent orderEvent, long arg1, boolean arg2) throws Exception {
            /**
             *做相应的业务处理
             */
            System.out.println(orderEvent.getOrder().getPid());
            
            
        }
    
    }

    4.创建事件生产者类

    /**
     * 事件生产者
     * 
     * @author Administrator
     *
     */
    public class OrderEvenProducer {
        private RingBuffer<OrderEvent> ringBuffer;// disruptor容器
    
        public OrderEvenProducer(RingBuffer<OrderEvent> ringBuffer) {
            this.ringBuffer = ringBuffer;
        }
    
        public void onData(Order order) {
            long index = ringBuffer.next();// 首先获取下一个事件槽位置
            try {
                OrderEvent orderEvent = ringBuffer.get(index);// 通过序列获得disruptorFacotry创建好的事件槽
                orderEvent.setOrder(order);// 填充好业务数据
            } finally {
                ringBuffer.publish(index);// 发布事件。使用finally保证publish调用
            }
    
        }
    }

    任务是 根据容器 往容器里面注册数据

    /**
     * 事件生产者
     * 
     * @author Administrator
     *
     */
    public class OrderEvenProducer {
        private RingBuffer<OrderEvent> ringBuffer;// disruptor容器
    
        public OrderEvenProducer(RingBuffer<OrderEvent> ringBuffer) {
            this.ringBuffer = ringBuffer;
        }
    
        public void onData(Order order) {
            long index = ringBuffer.next();// 首先获取下一个事件槽位置
            try {
                OrderEvent orderEvent = ringBuffer.get(index);// 通过序列获得disruptorFacotry创建好的事件槽
                orderEvent.setOrder(order);// 填充好业务数据
            } finally {
                ringBuffer.publish(index);// 发布事件。使用finally保证publish调用
            }
    
        }
    }

    5.测试

    package com.liqang.test;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import com.lmax.disruptor.YieldingWaitStrategy;
    import com.lmax.disruptor.dsl.Disruptor;
    import com.lmax.disruptor.dsl.ProducerType;
    
    import bhz.base.LongEventHandler;
    
    public class OrderEventMain {
        public static void main(String[] args) {
            /**
             * 创建线程池 不限制大小 60秒不被使用就会被回收
             */
            ExecutorService executorService = Executors.newCachedThreadPool();
    
            OrderEventFactory factory = new OrderEventFactory();// 创建事件工厂
            // 创建bufferSize ,也就是RingBuffer大小,必须是2的N次方 diruptor减少计算事件槽的时间
            int ringBufferSize = 1024 * 1024; //
            /**
             * //BlockingWaitStrategy 是最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现
             * WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
             * //SleepingWaitStrategy
             * 的性能表现跟BlockingWaitStrategy差不多,对CPU的消耗也类似,但其对生产者线程的影响最小,适合用于异步日志类似的场景
             * WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
             * //YieldingWaitStrategy
             * 的性能是最好的,适合用于低延迟的系统。在要求极高性能且事件处理线数小于CPU逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性
             * WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
             */
            // 初始化disruptorProducerType.SINGLE 表示是单生产者
            Disruptor<OrderEvent> disruptor = new Disruptor<>(factory, ringBufferSize, executorService, ProducerType.SINGLE,
                    new YieldingWaitStrategy());
            // 注册消费者事件处理器
            disruptor.handleEventsWith(new OrderEventHandle());
            // 启动
            disruptor.start();
            //创建生产者
            OrderEvenProducer orderEvenProducer=new OrderEvenProducer(disruptor.getRingBuffer());
            
            //模拟生产10个订单
            for (int i = 0; i <10; i++) {
                Order order=new Order();
                order.setId(i);
                order.setNum(i);
                order.setPid(i);
                order.setPid(i);
                orderEvenProducer.onData(order);
            }
            disruptor.shutdown();//关闭 disruptor,方法会堵塞,直至所有的事件都得到处理;
            executorService.shutdown();//关闭 disruptor 使用的线程池;如果需要的话,必须手动关闭, disruptor 在 shutdown 时不会自动关闭;        
        }
    }

      disruptor3.0提供lambda表达式的方式(需要jdk8)发布事件 改造事件发布者类

    package com.liqang.test;
    
    import java.nio.ByteBuffer;
    
    import com.lmax.disruptor.EventTranslatorOneArg;
    import com.lmax.disruptor.RingBuffer;
    
    import bhz.base.LongEvent;
    
    /**
     * 事件生产者
     * 
     * @author Administrator
     *
     */
    public class OrderEvenProducer {
        private static final EventTranslatorOneArg<OrderEvent, Order> TRANSLATOR = 
                new EventTranslatorOneArg<OrderEvent, Order>() {
                    @Override
                    public void translateTo(OrderEvent event, long sequeue, Order order) {
                        event.setOrder(order);
                    }
                };
        
        private final RingBuffer<OrderEvent> ringBuffer;
        
        public OrderEvenProducer(RingBuffer<OrderEvent> ringBuffer) {
            this.ringBuffer = ringBuffer;
        }
        
        public void onData(Order order){
            ringBuffer.publishEvent(TRANSLATOR, order);
            
            //ringBuffer.publishEvent((event,sequeue,or)->{event.setOrder(order);},order);
        }
    }

    直接使用RingBuffer

    package com.liqang.test;
    
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    
    import com.lmax.disruptor.BatchEventProcessor;
    import com.lmax.disruptor.EventFactory;
    import com.lmax.disruptor.RingBuffer;
    import com.lmax.disruptor.SequenceBarrier;
    import com.lmax.disruptor.YieldingWaitStrategy;
    
    import bhz.generate1.Trade;
    import bhz.generate1.TradeHandler;
    
    public class RingBufferTest {
        public static void main(String[] args) throws InterruptedException, ExecutionException {
    
         int bufferSize=1024*1024;
         //创建线程池
         ExecutorService executorService=Executors.newCachedThreadPool();
         //初始化ringbuffer
          RingBuffer<OrderEvent> ringBuffer=RingBuffer.createSingleProducer(new EventFactory<OrderEvent>() {
    
            @Override
            public OrderEvent newInstance() {
                // TODO Auto-generated method stub
                return new OrderEvent();
            }
        }, bufferSize, new YieldingWaitStrategy());
          //创建SequenceBarrier  
          SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();  
            
          //创建消息处理器  
          BatchEventProcessor<OrderEvent> transProcessor = new BatchEventProcessor<OrderEvent>(  
                  ringBuffer, sequenceBarrier, new OrderEventHandle());  
            
          //这一步的目的就是把消费者的位置信息引用注入到生产者    如果只有一个消费者的情况可以省略 
          ringBuffer.addGatingSequences(transProcessor.getSequence());  
            
          //把消息处理器提交到线程池  
          executorService.submit(transProcessor);  
          
          //如果存在多个消费者 那重复执行上面3行代码 把TradeHandler换成其它消费者类  
            
          Future<?> future= executorService.submit(new Callable<Void>() {  
              @Override  
              public Void call() throws Exception {  
                  long seq;  
                  for(int i=0;i<10;i++){  
                      seq = ringBuffer.next();//占个坑 --ringBuffer一个可用区块  
                      OrderEvent orderEvent=ringBuffer.get(seq);
                      Order order=new Order();
                      order.setId(i);
                      order.setNum(i);
                      order.setPid(i);
                      order.setPid(i);
                      orderEvent.setOrder(order);
                      ringBuffer.publish(seq);//发布这个区块的数据使handler(consumer)可见  
                  }  
                  return null;  
              }  
          }); 
          
          future.get();//等待生产者结束  
          Thread.sleep(1000);//等上1秒,等消费都处理完成  
          transProcessor.halt();//通知事件(或者说消息)处理器 可以结束了(并不是马上结束!!!)  
          executorService.shutdown();//终止线程  
    }
    }

    Disruptor做复杂业务操作

    disruptor还可以做很多复杂的业务操作

     如 一个事件c1 c2 处理器并行执行 执行完之后交给c3

     

    public class Main {  
        public static void main(String[] args) throws InterruptedException {  
           
            long beginTime=System.currentTimeMillis();  
            int bufferSize=1024;  
            ExecutorService executor=Executors.newFixedThreadPool(8);  
    
            Disruptor<Trade> disruptor = new Disruptor<Trade>(new EventFactory<Trade>() {  
                @Override  
                public Trade newInstance() {  
                    return new Trade();  
                }  
            }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy());  
            
            //菱形操作
           
            //使用disruptor创建消费者组C1,C2  
            EventHandlerGroup<Trade> handlerGroup = 
                    disruptor.handleEventsWith(new Handler1(), new Handler2());
            //声明在C1,C2完事之后执行JMS消息发送操作 也就是流程走到C3 (测试遇到一个问题就是 要队列被消费完了才会走到3)
            handlerGroup.then(new Handler3());
           
             
            
            disruptor.start();//启动  
            CountDownLatch latch=new CountDownLatch(1);  
            //生产者准备  
            executor.submit(new TradePublisher(latch, disruptor));
            
            latch.await();//等待生产者完事. 
           
            disruptor.shutdown();  
            executor.shutdown();  
            System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime));  
        }  
    }  
    package bhz.generate2;
    
    import java.util.Random;
    import java.util.concurrent.CountDownLatch;
    
    import bhz.generate1.Trade;
    
    import com.lmax.disruptor.EventTranslator;
    import com.lmax.disruptor.dsl.Disruptor;
    
    public class TradePublisher implements Runnable {  
        
        Disruptor<Trade> disruptor;  
        private CountDownLatch latch;  
        
        private static int LOOP=1000;//模拟百万次交易的发生  
      
        public TradePublisher(CountDownLatch latch,Disruptor<Trade> disruptor) {  
            this.disruptor=disruptor;  
            this.latch=latch;  
        }  
      
        @Override  
        public void run() {  for(int i=0;i<LOOP;i++){  
                disruptor.getRingBuffer().publishEvent((event,sequeue,or)->{event.setId("ff");},new Trade());
            }  
            latch.countDown();  
        }  
          
    }  
      
    

     

    顺序执行

    c1执行后交给c2处理 c2 处理后交给c3处理

     package bhz.generate2;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import bhz.generate1.Trade;
    import bhz.generate1.TradeHandler;
    
    import com.lmax.disruptor.BusySpinWaitStrategy;
    import com.lmax.disruptor.EventFactory;
    import com.lmax.disruptor.dsl.Disruptor;
    import com.lmax.disruptor.dsl.EventHandlerGroup;
    import com.lmax.disruptor.dsl.ProducerType;
    
    public class Main {  
        public static void main(String[] args) throws InterruptedException {  
           
            long beginTime=System.currentTimeMillis();  
            int bufferSize=1024;  
            ExecutorService executor=Executors.newFixedThreadPool(8);  
    
            Disruptor<Trade> disruptor = new Disruptor<Trade>(new EventFactory<Trade>() {  
                @Override  
                public Trade newInstance() {  
                    return new Trade();  
                }  
            }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy());  
            
    
            //顺序操作
           
            disruptor.handleEventsWith(new Handler1()).
                handleEventsWith(new Handler2()).
                handleEventsWith(new Handler3());
          
            
        
            
            disruptor.start();//启动  
            CountDownLatch latch=new CountDownLatch(1);  
            //生产者准备  
            executor.submit(new TradePublisher(latch, disruptor));
            
            latch.await();//等待生产者完事. 
           
            disruptor.shutdown();  
            executor.shutdown();  
            System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime));  
        }  
    }  

    六边形操作

     

     package bhz.generate2;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import bhz.generate1.Trade;
    import bhz.generate1.TradeHandler;
    
    import com.lmax.disruptor.BusySpinWaitStrategy;
    import com.lmax.disruptor.EventFactory;
    import com.lmax.disruptor.dsl.Disruptor;
    import com.lmax.disruptor.dsl.EventHandlerGroup;
    import com.lmax.disruptor.dsl.ProducerType;
    
    public class Main {  
        public static void main(String[] args) throws InterruptedException {  
           
            long beginTime=System.currentTimeMillis();  
            int bufferSize=1024;  
            ExecutorService executor=Executors.newFixedThreadPool(8);  
    
            Disruptor<Trade> disruptor = new Disruptor<Trade>(new EventFactory<Trade>() {  
                @Override  
                public Trade newInstance() {  
                    return new Trade();  
                }  
            }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy());  
             
            
            //六边形操作. 
         
            Handler1 h1 = new Handler1();
            Handler2 h2 = new Handler2();
            Handler3 h3 = new Handler3();
            Handler4 h4 = new Handler4();
            Handler5 h5 = new Handler5();
            disruptor.handleEventsWith(h1, h2);
            disruptor.after(h1).handleEventsWith(h4);
            disruptor.after(h2).handleEventsWith(h5);
            disruptor.after(h4, h5).handleEventsWith(h3);
            
            
            
            
            disruptor.start();//启动  
            CountDownLatch latch=new CountDownLatch(1);  
            //生产者准备  
            executor.submit(new TradePublisher(latch, disruptor));
            
            latch.await();//等待生产者完事. 
           
            disruptor.shutdown();  
            executor.shutdown();  
            System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime));  
        }  
    }  

    h1 h2并行执行  h1执行完毕之后h4执行 h2执行完毕之后h5执行最终交给h3汇总执行

     

    转载于:https://www.cnblogs.com/LQBlog/p/8997846.html

    展开全文
  • Disruptor3.x Disruptor使用方式 EventHandler[] eventHandlers=new DisruptorEventHandler[]{new DisruptorEventHandler()}; DisruptorPublisher dp=new DisruptorPublisher(1024, eventHandlers); dp.start(); ...
  • Disruptor使用指南

    千次阅读 2015-12-01 17:26:19
    Disruptor使用指南 Intruduction Lock vs CAS 避免伪共享 Linked Queue vs Array Ringbuffer 无时不刻的缓存 Component Sequence RingBuffer SequenceBarrier WaitStrategy BatchEvenProcess

    Disruptor使用指南

    声明:转自http://ziyue1987.github.io/,写的非常好,就拖过来学习下

    Mou icon

    Intruduction

    Intruduction

    Disruptor是Java实现的用于线程间通信的消息组件。其核心是一个Lock-free的Ringbuffer。我使用BlockingQueue与进Disruptor行了简单的对比测试,结果表明使用Disruptor来进行线程间通信效率会提高将近一倍。而LMAX给出的数据是,使用Disruptor能够在一个线程里每秒处理6百万订单。Disruptor为什么会如此快呢?通过参考Martin Fowler(Disruptor的开发者之一)的技术博客和Disruptor的源代码,可以总结出以下四条原因:

    Lock vs CAS

    Disruptor使用CAS而不是Lock。关于CAS(compare and swap)请参考WIKIPEDIA相关条目Compare and swap。与大部分并发队列使用的Lock相比,CAS显然要快很多。CAS是CPU级别的指令,更加轻量,不需要像Lock一样需要OS的支持,所以每次调用不需要kernel entry,也不需要context switch。当然,使用CAS的代价是Disruptor实现的复杂程度也相对提高了。

    避免伪共享

    现代计算机体系架构在各个层次都使用的Cache来提高效率,然而在多核体系结构中,对Cache的不恰当使用极易造成伪共享,使性能下降。关于伪共享请参考WIKIPEDIA相关条目False sharing。为了避免伪共享带来的性能下降,Disruptor对一切可能存在伪共享的地方使用Padding将两个不想关的内存隔离到两个缓存行上。可能存在伪共享的地方包括两个不相关的线程共享变量之间,线程私有变量和线程共享变量之间等。下面分别举例子说明。

    在Disruptor的实现中,有一个多线程共享的计数组件Sequence,对Sequence的操作可以说是整个Disruptor的核心,关于Sequence,在下文介绍各个组件的时候还要详细说明。这里主要说明它是怎样避免伪共享的。主要代码如下:

    static final long INITIAL_VALUE = -1L;
    private static final Unsafe UNSAFE;
    private static final long VALUE_OFFSET;
    
    static
    {
        UNSAFE = Util.getUnsafe();
        final int base = UNSAFE.arrayBaseOffset(long[].class);
        final int scale = UNSAFE.arrayIndexScale(long[].class);
        VALUE_OFFSET = base + (scale * 7);
    }
    
    private final long[] paddedValue = new long[15];
    
    . . .   . . .
    
    public long get()
    {
        return UNSAFE.getLongVolatile(paddedValue, VALUE_OFFSET);
    }
    

    Sequence定义了一个长度为15的long类型数组,但仅使用数组第八个元素进行计数,数组其他部分连同对象的头作为padding部分,保证在以64byte作为Cache缓存行大小的CPU中,计数元素(数组第八个元素)不会与其他变量存在于同一个缓存行中。关于Java中对象在内存中具体怎样布局,可以参考深入理解Java虚拟机:JVM高级特性与最佳实践

    另一个例子是关于线程私有变量的:

    private static class Padding
    {
        /** Set to -1 as sequence starting point */
        public long nextValue = Sequence.INITIAL_VALUE, cachedValue = Sequence.INITIAL_VALUE, p2, p3, p4, p5, p6, p7;
    }
    
    private final Padding pad = new Padding();
    

    这段代码用在单生产者的应用场景中。在这种应用场景下,这个计数器不需要是线程安全的,使用Sequence过于heavy了,但仍然需要通过padding将其与其他线程共享的变量隔离开来。变量p2到p7以及对象头的作用就是这个。

    Linked Queue vs Array Ringbuffer

    Disruptor选择使用Array Ringbuffer来构造lock-free队列,而不是选择Linked Queue。

    首先,数组是预分配的,这样不仅避免了Java GC带来的运行开销,而且因为在ringbuffer上进行的操作是顺序执行的,所以对缓存来说更加友好,保证了缓存命中率。使用Disruptor的时候,为了更好的利用Ringbuffer的这个优点,需要尽量将Ringbuffer的元素设计的可重用,生产者在生产消息或产生事件的时候对Ringbuffer元素中的属性进行更新,而不是替换Ringbuffer中的元素。

    其次,数组在定位元素的时候是使用索引,而链表在定位元素的时候使用对象引用(地址)。在lock-free队列中使用链表需要考虑使用Double-CAS等方式来克服ABA问题(关于double-CAS和ABA问题,请参coolshell上关于无锁队列的文章),而在数组中,因为元素是预分配的,所以不存在ABA问题。Disruptor使用递增的Sequence来标示不同时刻访问的相同元素,比如一个消费者的Sequence等于i的时候表示在访问Ringbuffer的某个位置的元素,在下一次访问这个位置的元素的时候,Seqence等于i + buffer_size。在需要访问数组元素的时候,只需要将序号对数组大小取余就可以得到数组索引。每次对ringbuffer的访问都会导致相应的Sequence增加。需要注意的是,由于Sequence是递增的,所以在到达最大值以后,会溢出,变成最小的负数,但这通常不是问题,因为要使long类型递增到溢出,即使每秒钟1000 000 000次递增,也需要上百年时间。

    无时不刻的缓存

    为了高效,Disruptor可谓无所不用其极,它绝不会放过任何利用缓存的机会,看一个例子。

    public long next(int n)
    {
        if (n < 1)
        {
            throw new IllegalArgumentException("n must be > 0");
        }
    
        long nextValue = pad.nextValue;
    
        long nextSequence = nextValue + n;
        long wrapPoint = nextSequence - bufferSize;
        long cachedGatingSequence = pad.cachedValue;
    
        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
        {
            long minSequence;
            while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
            {
                Thread.yield();
            }
    
            pad.cachedValue = minSequence;
        }
    
        pad.nextValue = nextSequence;
    
        return nextSequence;
    }
    

    这个函数是在单生产者的应用场景下生产者获取n个可用元素时执行的代码。在Disruptor里,需要多线程共享的序号,用Sequence表示,它是线程安全的,但访问Sequence的效率会因此降低,而在单线程内使用的序列号,是long类型,相对高效些。得益于序列号是递增的,就可以使用long类型缓存访问Seqence的结果,优先使用缓存的序号,只有当缓存的序号不满足条件时,才去访问Sequence。

    Component

    Sequence

    Sequence是Disruptor最核心的组件,上面已经提到过了。生产者对RingBuffer的互斥访问,生产者与消费者之间的协调以及消费者之间的协调,都是通过Sequence实现。几乎每一个重要的组件都包含Sequence。那么Sequence是什么呢?首先Sequence是一个递增的序号,说白了就是计数器;其次,由于需要在线程间共享,所以Sequence是引用传递,并且是线程安全的;再次,Sequence支持CAS操作;最后,为了提高效率,Sequence通过padding来避免伪共享。

    RingBuffer

    RingBuffer是存储消息的地方,通过一个名为cursor的Sequence对象指示队列的头,协调多个生产者向RingBuffer中添加消息,并用于在消费者端判断RingBuffer是否为空。巧妙的是,表示队列尾的Sequence并没有在RingBuffer中,而是由消费者维护。这样的好处是多个消费者处理消息的方式更加灵活,可以在一个RingBuffer上实现消息的单播,多播,流水线以及它们的组合。其缺点是在生产者端判断RingBuffer是否已满是需要跟踪更多的信息,为此,在RingBuffer中维护了一个名为gatingSequences的Sequence数组来跟踪相关Seqence。

    SequenceBarrier

    SequenceBarrier用来在消费者之间以及消费者和RingBuffer之间建立依赖关系。在Disruptor中,依赖关系实际上指的是Sequence的大小关系,消费者A依赖于消费者B指的是消费者A的Sequence一定要小于等于消费者B的Sequence,这种大小关系决定了处理某个消息的先后顺序。因为所有消费者都依赖于RingBuffer,所以消费者的Sequence一定小于等于RingBuffer中名为cursor的Sequence,即消息一定是先被生产者放到Ringbuffer中,然后才能被消费者处理。

    SequenceBarrier在初始化的时候会收集需要依赖的组件的Sequence,RingBuffer的cursor会被自动的加入其中。需要依赖其他消费者和/或RingBuffer的消费者在消费下一个消息时,会先等待在SequenceBarrier上,直到所有被依赖的消费者和RingBuffer的Sequence大于等于这个消费者的Sequence。当被依赖的消费者或RingBuffer的Sequence有变化时,会通知SequenceBarrier唤醒等待在它上面的消费者。

    WaitStrategy

    当消费者等待在SequenceBarrier上时,有许多可选的等待策略,不同的等待策略在延迟和CPU资源的占用上有所不同,可以视应用场景选择:

    • BusySpinWaitStrategy : 自旋等待,类似Linux Kernel使用的自旋锁。低延迟但同时对CPU资源的占用也多。

    • BlockingWaitStrategy : 使用锁和条件变量。CPU资源的占用少,延迟大。

    • SleepingWaitStrategy : 在多次循环尝试不成功后,选择让出CPU,等待下次调度,多次调度后仍不成功,尝试前睡眠一个纳秒级别的时间再尝试。这种策略平衡了延迟和CPU资源占用,但延迟不均匀。

    • YieldingWaitStrategy : 在多次循环尝试不成功后,选择让出CPU,等待下次调。平衡了延迟和CPU资源占用,但延迟也比较均匀。

    • PhasedBackoffWaitStrategy : 上面多种策略的综合,CPU资源的占用少,延迟大。

    BatchEvenProcessor

    在Disruptor中,消费者是以EventProcessor的形式存在的。其中一类消费者是BatchEvenProcessor。每个BatchEvenProcessor有一个Sequence,来记录自己消费RingBuffer中消息的情况。所以,一个消息必然会被每一个BatchEvenProcessor消费。

    WorkProcessor

    另一类消费者是WorkProcessor。每个WorkProcessor也有一个Sequence,多个WorkProcessor还共享一个Sequence用于互斥的访问RingBuffer。一个消息被一个WorkProcessor消费,就不会被共享一个Sequence的其他WorkProcessor消费。这个被WorkProcessor共享的Sequence相当于尾指针。

    WorkerPool

    共享同一个Sequence的WorkProcessor可由一个WorkerPool管理,这时,共享的Sequence也由WorkerPool创建。

    Use Cases

    下面以Disruptor 3.2.0版本为例介绍Disruptor的初级使用方法,大部分代码是出自Disruptor源代码中得perftest部分(Disruptor代码这里下载)。

    消息定义

    Disruptor中消息对象可以自由定义,但是必须定义实现EventFactory接口的消息对象工厂来告诉RingBuffer如何初始化消息对象。

    public final class ValueEvent
    {
        private long value;
    
        public long getValue()
        {
            return value;
        }
    
        public void setValue(final long value)
        {
            this.value = value;
        }
    
        public static final EventFactory<ValueEvent> EVENT_FACTORY = new EventFactory<ValueEvent>()
        {
            public ValueEvent newInstance()
            {
                return new ValueEvent();
            }
        };
    }
    

    Producer

    Disruptor中同样没有定义生产者,而是由RingBuffer提供添加消息的接口。针对单生产者和多生产者两种应用场景,RingBuffer提供了不同的初始化方法:

    • 单生产者应用场景

      private final RingBuffer<ValueEvent> ringBuffer =
          createSingleProducer(ValueEvent.EVENT_FACTORY, BUFFER_SIZE, new YieldingWaitStrategy());
      
    • 多生产者应用场景

      private final RingBuffer<ValueEvent> ringBuffer =
          createMultiProducer(ValueEvent.EVENT_FACTORY, BUFFER_SIZE, new BusySpinWaitStrategy());
      

    初始化的时候需要提供消息工厂,RingBuffer大小,以及一个选定的waitStrategy。向RingBuffer中添加消息的过程分成两阶段:1,申请可用节点,并将消息放入节点中;2,提交节点。

    // 阶段1:申请节点,并将消息放入节点中
    long next = rb.next();
    rb.get(next).setValue(0);
    
    // 阶段2:提交节点
    rb.publish(next);
    

    EventProcessor及其依赖关系

    Disruptor定义了两种EventProcessor:BatchEventProcessor和WorkProcessor。两种EventProcessor都实现了Runnable接口,在组装完成后可以直接放入线程中执行。

    用户需要实现自己的EventHandler来告诉EventProcessor在收到消息的时候怎样处理。

    用户还需要结合SequenceBarrier来构造各个EventProcessor之间及其和RingBuffer之间的依赖,关于依赖的定义,已经在上文解释过了。这里需要说明的是,我们在使用Queue构造pipeline的时候,类似于接水管,每一个步骤需要哪些处理,就用Queue接过去,处理完成后再用Queue接到下一个步骤。这种方式固然实现起来简单,但是消息需要穿过各个Queue,必要的时候还需要对消息进行复制,这会产生大量对Queue的并发访问操作,效率很低。在Disruptor里,相邻的两个步骤被解释成步骤2中的EventProcessor依赖步骤1中的EventProcessor,消息在RingBuffer中依次被步骤1中的EventProcessor和步骤2中的EventProcessor处理。

    不仅EventProcessor对RingBuffer有依赖,RingBuffer对EventProcessor也有反向依赖。RingBuffer需要保证在生产者比消费者快得情况下,新生产的消息不会覆盖未被完全消费(即被所有EventProcessor处理)的消息。为了做到这一点,RingBuffer会追踪有依赖关系的EventProcessor中最小的Sequence(如果不能根据依赖关系判断Sequence大小,则全部追踪)。需要追踪的Sequence会加入到RingBuffer的gatingSequences数组中。下面通过几个use case说明两种EventProcessor和RingBuffer如何组装。

    One Producer to one BatchEventProcessor

    这是最简单的场景,一个BatchEventProcessor

    // 构造RingBuffer
    private final RingBuffer<ValueEvent> ringBuffer =
        createSingleProducer(ValueEvent.EVENT_FACTORY, BUFFER_SIZE, new YieldingWaitStrategy());
    
    // 构造BatchEventProcessor 及依赖关系
    private final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
    private final ValueAdditionEventHandler handler = new ValueAdditionEventHandler();
    private final BatchEventProcessor<ValueEvent> batchEventProcessor = new BatchEventProcessor<ValueEvent>(ringBuffer, sequenceBarrier, handler);
    
    // 构造反向依赖
    ringBuffer.addGatingSequences(batchEventProcessor.getSequence());
    

    One Producer to three BatchEventProcessors Pipeline

    三个BatchEventProcessor构成一个pipeline,对一个消息先后进行加工。

    // 构造RingBuffer
    private final RingBuffer<FunctionEvent> ringBuffer =
        createSingleProducer(FunctionEvent.EVENT_FACTORY, BUFFER_SIZE, new YieldingWaitStrategy());
    
    // 构造BatchEventProcessor 及依赖关系
    // stepOneBatchProcessor依赖RingBuffer
    private final SequenceBarrier stepOneSequenceBarrier = ringBuffer.newBarrier();
    private final FunctionEventHandler stepOneFunctionHandler = new FunctionEventHandler(FunctionStep.ONE);
    private final BatchEventProcessor<FunctionEvent> stepOneBatchProcessor =
        new BatchEventProcessor<FunctionEvent>(ringBuffer, stepOneSequenceBarrier, stepOneFunctionHandler);
    
    // stepTwoBatchProcessor依赖RingBuffer和stepOneBatchProcessor
    private final SequenceBarrier stepTwoSequenceBarrier = ringBuffer.newBarrier(stepOneBatchProcessor.getSequence());
    private final FunctionEventHandler stepTwoFunctionHandler = new FunctionEventHandler(FunctionStep.TWO);
    private final BatchEventProcessor<FunctionEvent> stepTwoBatchProcessor =
        new BatchEventProcessor<FunctionEvent>(ringBuffer, stepTwoSequenceBarrier, stepTwoFunctionHandler);
    
    // stepThreeBatchProcessor依赖RingBuffer和stepTwoBatchProcessor
    private final SequenceBarrier stepThreeSequenceBarrier = ringBuffer.newBarrier(stepTwoBatchProcessor.getSequence());
    private final FunctionEventHandler stepThreeFunctionHandler = new FunctionEventHandler(FunctionStep.THREE);
    private final BatchEventProcessor<FunctionEvent> stepThreeBatchProcessor =
        new BatchEventProcessor<FunctionEvent>(ringBuffer, stepThreeSequenceBarrier, stepThreeFunctionHandler);
    
    // 构造反向依赖,stepThreeBatchProcessor的Sequence最小
    ringBuffer.addGatingSequences(stepThreeBatchProcessor.getSequence());
    

    One Producer to three BatchEventProcessors MultiCast

    一个消息被三个BatchEventProcessor处理,但没有先后关系。

    // 构造RingBuffer
    private final RingBuffer<ValueEvent> ringBuffer =
        createSingleProducer(ValueEvent.EVENT_FACTORY, BUFFER_SIZE, new YieldingWaitStrategy());
    
    // 构造BatchEventProcessor 及依赖关系
    private final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
    
    private final ValueMutationEventHandler[] handlers = new ValueMutationEventHandler[NUM_EVENT_PROCESSORS];
    
    handlers[0] = new ValueMutationEventHandler(Operation.ADDITION);
    handlers[1] = new ValueMutationEventHandler(Operation.SUBTRACTION);
    handlers[2] = new ValueMutationEventHandler(Operation.AND);
    
    private final BatchEventProcessor<?>[] batchEventProcessors = new BatchEventProcessor[3];
    
    batchEventProcessors[0] = new BatchEventProcessor<ValueEvent>(ringBuffer, sequenceBarrier, handlers[0]);
    batchEventProcessors[1] = new BatchEventProcessor<ValueEvent>(ringBuffer, sequenceBarrier, handlers[1]);
    batchEventProcessors[2] = new BatchEventProcessor<ValueEvent>(ringBuffer, sequenceBarrier, handlers[2]);
    
    // 构造反向依赖,三个EventProcessor没有依赖关系,将它们的Sequence全部加入
    ringBuffer.addGatingSequences(batchEventProcessors[0].getSequence(),
                                  batchEventProcessors[1].getSequence(),
                                  batchEventProcessors[2].getSequence());
    

    One Producer to two WorkProcessors

    一个消息只会被两个WorkProcessor中的一个处理。

    // 构造RingBuffer
    private final RingBuffer<ValueEvent> ringBuffer =
            RingBuffer.createSingleProducer(ValueEvent.EVENT_FACTORY,
                                            BUFFER_SIZE,
                                            new YieldingWaitStrategy());
    
    // 构造拥有两个WorkProcessor的WorkerPool                                        
    private final EventCountingWorkHandler[] handlers = new EventCountingWorkHandler[2];
    for (int i = 0; i < 2; i++)
    {
        handlers[i] = new EventCountingWorkHandler(counters, i);
    }
    
    private final WorkerPool<ValueEvent> workerPool =
            new WorkerPool<ValueEvent>(ringBuffer,
                                       ringBuffer.newBarrier(),
                                       new FatalExceptionHandler(),
                                       handlers);
    
    // 构造反向依赖
    ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
    

    One Producer to two WorkerPools

    一个消息会被两个WorkerPool中的WorkProcessor处理,但在一个WorkerPool中只能被一个WorkProcessor处理。

    // 构造RingBuffer
    private final RingBuffer<ValueEvent> ringBuffer =
            RingBuffer.createSingleProducer(ValueEvent.EVENT_FACTORY,
                                            BUFFER_SIZE,
                                            new YieldingWaitStrategy());
    
    SequenceBarrier barrier = ringBuffer.newBarrier();
    
    // 构造拥有两个WorkProcessor的WorkerPool
    private final EventCountingWorkHandler[] handlers = new EventCountingWorkHandler[4];
    for (int i = 0; i < 4; i++)
    {
        handlers[i] = new EventCountingWorkHandler(counters, i);
    }
    
    private final WorkerPool<LesStringEvent> workerPool0 =
            new WorkerPool<LesStringEvent>(ringBuffer,
                                           barrier,
                                           new FatalExceptionHandler(),
                                           handlers[0], handlers[1]);
    private final WorkerPool<LesStringEvent> workerPool1 =
            new WorkerPool<LesStringEvent>(ringBuffer,
                                           barrier,
                                           new FatalExceptionHandler(),
                                           handlers[2], handlers[3]);
    
    // 构造反向依赖
    ringBuffer.addGatingSequences(workerPool0.getWorkerSequences());
    ringBuffer.addGatingSequences(workerPool1.getWorkerSequences());
    

    结束语

    本文主要讲述了Disruptor得基本使用方法,涉及少量对实现的解释,意在通过Disruptor的使用管窥Disruptor的设计思路。如果有时间,就再写一篇关于Disruptor实现的文章。本文没有涉及Disruptor定义的DSL(领域特定语言)接口,DSL可以更方便的使用Disruptor。

    并发框架Disruptor译文:

    http://ifeve.com/disruptor/


    展开全文
  • disruptor使用笔记

    2018-09-10 16:16:50
    Disruptor是一种开源并发框架。简介: 1 能够在一个线程内每秒处理六百万个订单 2 能够在无锁的情况下实现网络的Queue并发操作 3 Disruptor是一种非常快的消息框架(轻量的JMS)也可以认为是观察者模式的一个实现...

    Disruptor是一种开源并发框架。简介:

    1 能够在一个线程内每秒处理六百万个订单

    2 能够在无锁的情况下实现网络的Queue并发操作

    3 Disruptor是一种非常快的消息框架(轻量的JMS)也可以认为是观察者模式的一个实现

     

    下面是对Disruptor的一个简单实现

    disruptor策略:

    BlockingWaitStrategy 是最低效的策略,但其对cpu的消耗最小并且在各种环境中提供更加一致的表现

    SleepingWaitStrategy 的性能与BlockingWaitStrategy差不多,cpu消耗也差不多,但对生产者线程影响最小,适合异步日志类

    YieldingWaitStrategy的性能是最好的,适合于低延迟的系统,在要求极高性能且事件处理线小于cpu逻辑核心数的场景中,使用此策略,例如cpu开启超线程特性(推荐使用)

     

    Disruptor d =new Disruptor<Event(自定义的一个bean)>(EventFactory factory,ringBufferSize,executor,ProducerType,

    YiedingWaitStrategy);

    d.handlerEventsWith(EventHandler e); //传入一个实现了EventHandler接口的对象,在该对象重写该接口的方法中写处理逻辑

    d.start();//启动disruptor

    RingBuffer ringBuffer = disruptor.getRingBuffer();//得到ringBuffer对象

    LongEventProducer producer  = new LongEventProducer(ringBuffer);//得到生产者实例对象,生产者自定义,定义模板见下面代码

    producer.onData();//填充并发布事件

    DisruptorTest类

    package dis;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import org.springframework.web.servlet.handler.DispatcherServletWebRequest;
    
    import com.lmax.disruptor.BusySpinWaitStrategy;
    import com.lmax.disruptor.EventFactory;
    import com.lmax.disruptor.RingBuffer;
    import com.lmax.disruptor.YieldingWaitStrategy;
    import com.lmax.disruptor.dsl.Disruptor;
    import com.lmax.disruptor.dsl.EventHandlerGroup;
    import com.lmax.disruptor.dsl.ProducerType;
    
    public class DisruptorTest {
    
    	public static void main(String[] args) throws InterruptedException {
    		CountDownLatch countDownLatch = new CountDownLatch(10);//设置子线程准备数
    		//RingBuffer<LongEvent> = new R
    		ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 2);
    		@SuppressWarnings("deprecation")
    		Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(new EventFactory<LongEvent>() {
    
    			public LongEvent newInstance() {
    				// TODO Auto-generated method stub
    				return new LongEvent();
    			}
    			
    		},1024*1024,executor, ProducerType.SINGLE, new YieldingWaitStrategy());	
    		
    		RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();//得到    RingBuffer
    		
    		LongEventHandler1 h1 = new LongEventHandler1();
    		LongEventHandler2 h2 = new LongEventHandler2();
    		LongEventHandler3 h3 = new LongEventHandler3();
    		LongEventHandler4 h4 = new LongEventHandler4();
    		LongEventHandler5 h5 = new LongEventHandler5();
    		
    		disruptor.handleEventsWith(h1,h2);//1236 1456 //六边形处理
    		disruptor.after(h1).handleEventsWith(h3);
    		disruptor.after(h2).handleEventsWith(h4);
    		disruptor.after(h3,h4).handleEventsWith(h5);
    		disruptor.start();
    		
    		for(int i = 0;i < 10;i++) {
    			new Thread(new LongEventProducerThread(countDownLatch, ringBuffer)).start();
    		}
    		countDownLatch.await();//等待所有子线程执行完毕
    		disruptor.shutdown();//关闭disruptor
    		executor.shutdown();
    		
    		
         }
    }

     

     

    LongEvent类

    package dis;
    
    public class LongEvent {
    
    	private int goodsPrice;
    	private String goodsName;
    	private int goodsId;
    	private String goodsType;
    	private String goodsSeller;
    	private String goodsBuyer;
    	
    	
    	public LongEvent(){
    		
    	}
    
    
    	public int getGoodsPrice() {
    		return goodsPrice;
    	}
    
    
    	public void setGoodsPrice(int goodsPrice) {
    		this.goodsPrice = goodsPrice;
    	}
    
    
    	public String getGoodsName() {
    		return goodsName;
    	}
    
    
    	public void setGoodsName(String goodsName) {
    		this.goodsName = goodsName;
    	}
    
    
    	public int getGoodsId() {
    		return goodsId;
    	}
    
    
    	public void setGoodsId(int goodsId) {
    		this.goodsId = goodsId;
    	}
    
    
    	public String getGoodsType() {
    		return goodsType;
    	}
    
    
    	public void setGoodsType(String goodsType) {
    		this.goodsType = goodsType;
    	}
    
    
    	public String getGoodsSeller() {
    		return goodsSeller;
    	}
    
    
    	public void setGoodsSeller(String goodsSeller) {
    		this.goodsSeller = goodsSeller;
    	}
    
    
    	public String getGoodsBuyer() {
    		return goodsBuyer;
    	}
    
    
    	public void setGoodsBuyer(String goodsBuyer) {
    		this.goodsBuyer = goodsBuyer;
    	}
    
    
    	
    }
    
    package dis;
    
    import com.lmax.disruptor.EventHandler;
    
    public class LongEventHandler0 implements EventHandler<LongEvent> {
    
    	@Override
    	public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
    		// TODO Auto-generated method stub
    		System.out.println("商品Id"+event.getGoodsId());
    	}
    
    	
    }
    package dis;
    
    import com.lmax.disruptor.RingBuffer;
    import com.sun.corba.se.impl.ior.ByteBuffer;
    
    public class LongEventProducer{
    
    	RingBuffer<LongEvent> ringBuffer;
    	
      public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
    	  this.ringBuffer = ringBuffer;
      }
      //发布数据
      public void onData(String s) {
    	  Long sequence = ringBuffer.next();//索取下一个事件槽
    	  try{
    	  LongEvent event = ringBuffer.get(sequence);//得到下一个事件槽中事件
    	  event.setGoodsBuyer("goodsBuyer");//填充数据
    	  event.setGoodsId(10010);
    	  event.setGoodsName("goodsName");
    	  event.setGoodsType("goodsType");
    	  event.setGoodsSeller("goodsSeller");
    	  event.setGoodsBuyer("goodsBuyer");
    	  }catch(Exception e) {
    		  
    	  }finally{
    		 ringBuffer.publish(sequence);//发布事件
    	  }
    	}
      
      
    }
    

     

    package dis;
    
    import java.util.concurrent.CountDownLatch;
    
    import com.lmax.disruptor.RingBuffer;
    
    public class LongEventProducerThread implements Runnable{
    
    	public LongEventProducerThread() {
    		
    	}
    	
    	CountDownLatch countDownLatch;
    	RingBuffer<LongEvent> ringBuffer;
    	
    	public LongEventProducerThread(CountDownLatch countDownLatch,RingBuffer<LongEvent> ringBuffer) {
    		// TODO Auto-generated constructor stub
    	this.countDownLatch = countDownLatch;
    	this.ringBuffer = ringBuffer;
    	}
    	
    	@Override
    	public void run() {
    		LongEventProducer producer = new LongEventProducer(ringBuffer);
    		for(int i = 0;i < 1;i++) {
    		producer.onData("a");
    		}
    		countDownLatch.countDown();
    	}
    }
    

     

    展开全文
  • Disruptor使用入门

    2016-11-30 18:08:32
     工作中用到了这个Disruptor东东,于是看了几篇博客,下面记录下,以便自己研究  已经不记得最早接触到 Disruptor 是什么时候了,只记得发现它的时候它是以具有闪电般的速度被介绍的。于是在脑子里, Disruptor ...

     转载自http://www.cnblogs.com/haiq/p/4112689.html

            工作中用到了这个Disruptor东东,于是看了几篇博客,下面记录下,以便自己研究

      已经不记得最早接触到 Disruptor 是什么时候了,只记得发现它的时候它是以具有闪电般的速度被介绍的。于是在脑子里, Disruptor 和“闪电”一词关联了起来,然而却一直没有时间去探究一下。
    最近正在进行一项对性能有很高要求的产品项目的研究,自然想起了闪电般的 Disruptor ,这必有它的用武之地,于是进行了一番探查,将成果和体会记录在案。

    一、什么是 Disruptor 

    从功能上来看,Disruptor 是实现了“队列”的功能,而且是一个有界队列。那么它的应用场景自然就是“生产者-消费者”模型的应用场合了。

    可以拿 JDK 的 BlockingQueue 做一个简单对比,以便更好地认识 Disruptor 是什么。

    我们知道 BlockingQueue 是一个 FIFO 队列,生产者(Producer)往队列里发布(publish)一项事件(或称之为“消息”也可以)时,消费者(Consumer)能获得通知;如果没有事件时,消费者被堵塞,直到生产者发布了新的事件。

    这些都是 Disruptor 能做到的,与之不同的是,Disruptor 能做更多:

    • 同一个“事件”可以有多个消费者,消费者之间既可以并行处理,也可以相互依赖形成处理的先后次序(形成一个依赖图);
    • 预分配用于存储事件内容的内存空间;
    • 针对极高的性能目标而实现的极度优化和无锁的设计;

    以上的描述虽然简单地指出了 Disruptor 是什么,但对于它“能做什么”还不是那么直截了当。一般性地来说,当你需要在两个独立的处理过程(两个线程)之间交换数据时,就可以使用 Disruptor 。当然使用队列(如上面提到的 BlockingQueue)也可以,只不过 Disruptor 做得更好。

    拿队列来作比较的做法弱化了对 Disruptor 有多强大的认识,如果想要对此有更多的了解,可以仔细看看 Disruptor 在其东家 LMAX 交易平台(也是实现者) 是如何作为核心架构来使用的,这方面就不做详述了,问度娘或谷哥都能找到。

    二、Disruptor 的核心概念

    先从了解 Disruptor 的核心概念开始,来了解它是如何运作的。下面介绍的概念模型,既是领域对象,也是映射到代码实现上的核心对象。

    • Ring Buffer
      如其名,环形的缓冲区。曾经 RingBuffer 是 Disruptor 中的最主要的对象,但从3.0版本开始,其职责被简化为仅仅负责对通过 Disruptor 进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer 可以由用户的自定义实现来完全替代。
    • Sequence  Disruptor
      通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个 Sequence 用于跟踪标识某个特定的事件处理者( RingBuffer/Consumer )的处理进度。虽然一个 AtomicLong 也可以用于标识进度,但定义 Sequence 来负责该问题还有另一个目的,那就是防止不同的 Sequence 之间的CPU缓存伪共享(Flase Sharing)问题。
      (注:这是 Disruptor 实现高性能的关键点之一,网上关于伪共享问题的介绍已经汗牛充栋,在此不再赘述)。
    • Sequencer 
      Sequencer 是 Disruptor 的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。
    • Sequence Barrier
      用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用。 Sequence Barrier 还定义了决定 Consumer 是否还有可处理的事件的逻辑。
    • Wait Strategy
      定义 Consumer 如何进行等待下一个事件的策略。 (注:Disruptor 定义了多种不同的策略,针对不同的场景,提供了不一样的性能表现)
    • Event
      在 Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。
    • EventProcessor
      EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。
    • EventHandler
      Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现。
    • Producer
      即生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型。


    三、如何使用 Disruptor 

    Disruptor 的 API 十分简单,主要有以下几个步骤:

    1. 定义事件
      事件(Event)就是通过 Disruptor 进行交换的数据类型。
    public class LongEvent
    {
        private long value;
    
        public void set(long value)
        {
            this.value = value;
        }
    }
       2.定义事件工厂
    事件工厂(Event Factory)定义了如何实例化前面第1步中定义的事件(Event),需要实现接口 com.lmax.disruptor.EventFactory<T>。
    Disruptor 通过 EventFactory 在 RingBuffer 中预创建 Event 的实例。
    一个 Event 实例实际上被用作一个“数据槽”,发布者发布前,先从 RingBuffer 获得一个 Event 的实例,然后往 Event 实例中填充数据,之后再发布到 RingBuffer 中,之后由 Consumer 获得该 Event 实例并从中读取数据。

    import com.lmax.disruptor.EventFactory;
    
    public class LongEventFactory implements EventFactory<LongEvent>
    {
        public LongEvent newInstance()
        {
            return new LongEvent();
        }
    }

     3. 定义事件处理的具体实现
    通过实现接口 com.lmax.disruptor.EventHandler<T> 定义事件处理的具体实现。
    import com.lmax.disruptor.EventHandler;
    
    public class LongEventHandler implements EventHandler<LongEvent>
    {
        public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
        {
            System.out.println("Event: " + event);
        }
    }
    4.定义用于事件处理的线程池
    Disruptor 通过 java.util.concurrent.ExecutorService 提供的线程来触发 Consumer 的事件处理。例如:
    ExecutorService executor = Executors.newCachedThreadPool();
    5.指定等待策略
    Disruptor 定义了 com.lmax.disruptor.WaitStrategy 接口用于抽象 Consumer 如何等待新事件,这是策略模式的应用。
    Disruptor 提供了多个 WaitStrategy 的实现,每种策略都具有不同性能和优缺点,根据实际运行环境的 CPU 的硬件特点选择恰当的策略,并配合特定的 JVM 的配置参数,能够实现不同的性能提升。
    例如,BlockingWaitStrategy、SleepingWaitStrategy、YieldingWaitStrategy 等,其中,
    BlockingWaitStrategy 是最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现;
    SleepingWaitStrategy 的性能表现跟 BlockingWaitStrategy 差不多,对 CPU 的消耗也类似,但其对生产者线程的影响最小,适合用于异步日志类似的场景;
    YieldingWaitStrategy 的性能是最好的,适合用于低延迟的系统。在要求极高性能且事件处理线数小于 CPU 逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性。
    WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
    WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
    WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
    6.启动 Disruptor
    EventFactory<LongEvent> eventFactory = new LongEventFactory();
    ExecutorService executor = Executors.newSingleThreadExecutor();
    int ringBufferSize = 1024 * 1024; // RingBuffer 大小,必须是 2 的 N 次方;
            
    Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory,
                    ringBufferSize, executor, ProducerType.SINGLE,
                    new YieldingWaitStrategy());
            
    EventHandler<LongEvent> eventHandler = new LongEventHandler();
    disruptor.handleEventsWith(eventHandler);
            
    disruptor.start();
    发布事件
    7.Disruptor 的事件发布过程是一个两阶段提交的过程:
      第一步:先从 RingBuffer 获取下一个可以写入的事件的序号;
      第二步:获取对应的事件对象,将数据写入事件对象;
      第三部:将事件提交到 RingBuffer;
    事件只有在提交之后才会通知 EventProcessor 进行处理;
    // 发布事件;
    RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
    long sequence = ringBuffer.next();//请求下一个事件序号;
        
    try {
        LongEvent event = ringBuffer.get(sequence);//获取该序号对应的事件对象;
        long data = getEventData();//获取要通过事件传递的业务数据;
        event.set(data);
    } finally{
        ringBuffer.publish(sequence);//发布事件;
    }

     注意,最后的 ringBuffer.publish 方法必须包含在 finally 中以确保必须得到调用;如果某个请求的 sequence 未被提交,将会堵塞后续的发布操作或者其它的 producer。

    Disruptor 还提供另外一种形式的调用来简化以上操作,并确保 publish 总是得到调用。
    static class Translator implements EventTranslatorOneArg<LongEvent, Long>{
        @Override
        public void translateTo(LongEvent event, long sequence, Long data) {
            event.set(data);
        }    
    }
        
    public static Translator TRANSLATOR = new Translator();
        
    public static void publishEvent2(Disruptor<LongEvent> disruptor) {
        // 发布事件;
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
        long data = getEventData();//获取要通过事件传递的业务数据;
        ringBuffer.publishEvent(TRANSLATOR, data);
    }
    此外,Disruptor 要求 RingBuffer.publish 必须得到调用的潜台词就是,如果发生异常也一样要调用 publish ,那么,很显然这个时候需要调用者在事件处理的实现上来判断事件携带的数据是否是正确的或者完整的,这是实现者应该要注意的事情。
    8 关闭 Disruptor

    disruptor.shutdown();//关闭 disruptor,方法会堵塞,直至所有的事件都得到处理;
    executor.shutdown();//关闭 disruptor 使用的线程池;如果需要的话,必须手动关闭, disruptor 在 shutdown 时不会自动关闭;

    四、性能对比测试

      为了直观地感受 Disruptor 有多快,设计了一个性能对比测试:Producer 发布 100 万次事件,从发布第一个事件开始计时,捕捉 Consumer 处理完所有事件的耗时。

      测试用例在 Producer 如何将事件通知到 Consumer 的实现方式上,设计了三种不同的实现:

    1. Producer 的事件发布和 Consumer 的事件处理都在同一个线程,Producer 发布事件后立即触发 Consumer 的事件处理;
    2. Producer 的事件发布和 Consumer 的事件处理在不同的线程,通过 ArrayBlockingQueue 传递给 Consumer 进行处理;
    3. Producer 的事件发布和 Consumer 的事件处理在不同的线程,通过 Disruptor 传递给 Consumer 进行处理;

    此次测试用例仅做了只有一个 Producer 和一个 Consumer 的情形,测试用例的代码如下:

    CounterTracer tracer = tracerFactory.newInstance(DATA_COUNT);//计数跟踪到达指定的数值;
    TestHandler handler = new TestHandler(tracer);//Consumer 的事件处理;
            
    EventPublisher publisher = publisherFactory.newInstance(new PublisherCreationArgs(DATA_COUNT, handler));//通过工厂对象创建不同的 Producer 的实现;
    publisher.start();
    tracer.start();
            
    //发布事件;
    for (int i = 0; i < DATA_COUNT; i++) {
        publisher.publish(i);
    }
            
    //等待事件处理完成;
    tracer.waitForReached();
            
    publisher.stop();
            
    //输出结果;
    printResult(tracer);
    复制代码
    事件处理的实现只是调用一个计数器(CounterTracer)加1,该计数器跟踪从开始到达到总的事件次数时所耗的时间。
    public class TestHandler {
        
        private CounterTracer tracer;
        
        public TestHandler(CounterTracer tracer) {
            this.tracer = tracer;
        }
        
        /**
         * 如果返回 true,则表示处理已经全部完成,不再处理后续事件;
         * 
         * @param event
         * @return
         */
        public boolean process(TestEvent event){
            return tracer.count();
        }
    }
    针对单一Producer 和单一 Consumer 的测试场景,CounterTracer 的实现如下:
    /**
     * 测试结果跟踪器,计数器不是线程安全的,仅在单线程的 consumer 测试中使用;
     * 
     * @author haiq
     *
     */
    public class SimpleTracer implements CounterTracer {
    
        private long startTicks;
        private long endTicks;
        private long count = 0;
        private boolean end = false;
        private final long expectedCount;
        private CountDownLatch latch = new CountDownLatch(1);
    
        public SimpleTracer(long expectedCount) {
            this.expectedCount = expectedCount;
        }
    
        @Override
        public void start() {
            startTicks = System.currentTimeMillis();
            end = false;
        }
    
        @Override
        public long getMilliTimeSpan() {
            return endTicks - startTicks;
        }
    
        @Override
        public boolean count() {
            if (end) {
                return end;
            }
            count++;
            end = count >= expectedCount;
            if (end) {
                endTicks = System.currentTimeMillis();
                latch.countDown();
            }
            return end;
        }
    
        @Override
        public void waitForReached() throws InterruptedException {
            latch.await();
        }
    }
    第一种 Producer 的实现:直接触发事件处理;

    public class DirectingPublisher implements EventPublisher {
        
        private TestHandler handler;    
        private TestEvent event = new TestEvent();
        
        public DirectingPublisher(TestHandler handler) {
            this.handler = handler;
        }
    
        @Override
        public void publish(int data) throws Exception {
            event.setValue(data);
            handler.process(event);
        }
    
        //省略其它代码;    
    }
    第二种 Producer 的实现:通过 ArrayBlockinigQueue 实现;
    public class BlockingQueuePublisher implements EventPublisher {
        
        private ArrayBlockingQueue<TestEvent> queue ;    
        private TestHandler handler;    
        public BlockingQueuePublisher(int maxEventSize, TestHandler handler) {
            this.queue = new ArrayBlockingQueue<TestEvent>(maxEventSize);
            this.handler = handler;
        }
    
        public void start(){
            Thread thrd = new Thread(new Runnable() {
                @Override
                public void run() {
                    handle();
                }
            });
            thrd.start();
        }
        
        private void handle(){
            try {
                TestEvent evt ;
                while (true) {
                    evt = queue.take();
                    if (evt != null && handler.process(evt)) {
                        //完成后自动结束处理线程;
                        break;
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void publish(int data) throws Exception {
            TestEvent evt = new TestEvent();
            evt.setValue(data);
            queue.put(evt);
        }
    
        //省略其它代码;
    }
    第三种 Producer 的实现:通过 Disruptor 实现;

    public class DisruptorPublisher implements EventPublisher {
    
        private class TestEventHandler implements EventHandler<TestEvent> {
    
            private TestHandler handler;
    
            public TestEventHandler(TestHandler handler) {
                this.handler = handler;
            }
    
            @Override
            public void onEvent(TestEvent event, long sequence, boolean endOfBatch)
                    throws Exception {
                handler.process(event);
            }
    
        }
        
        private static final WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
    
        private Disruptor<TestEvent> disruptor;
        private TestEventHandler handler;
        private RingBuffer<TestEvent> ringbuffer;    
        private ExecutorService executor;
    
        public DisruptorPublisher(int bufferSize, TestHandler handler) {
            this.handler = new TestEventHandler(handler);
            executor = Executors.newSingleThreadExecutor();
            disruptor = new Disruptor<TestEvent>(EVENT_FACTORY, bufferSize,
                    executor, ProducerType.SINGLE,
                    YIELDING_WAIT);
        }
    
        @SuppressWarnings("unchecked")
        public void start() {
            disruptor.handleEventsWith(handler);
            disruptor.start();
            ringbuffer = disruptor.getRingBuffer();
        }
    
        @Override
        public void publish(int data) throws Exception {
            long seq = ringbuffer.next();
            try {
                TestEvent evt = ringbuffer.get(seq);
                evt.setValue(data);
            } finally {
                ringbuffer.publish(seq);
            }
        }
    
        //省略其它代码;
    }

    Producer 第一种实现并没有线程间的交换,实际上就是直接调用计数器,因此以此种实现的测试结果作为基准,对比其它的两种实现的测试结果。

    在我的CPU CORE i5 / 4G 内存 / Win7 64 位的笔记本上,数据量(DATA_COUNT)取值为 1024 * 1024 时的测试结果如下:

    【基准测试】
    [1]--每秒吞吐量:--;(1048576/0ms)
    [2]--每秒吞吐量:--;(1048576/0ms)
    [3]--每秒吞吐量:--;(1048576/0ms)
    [4]--每秒吞吐量:69905066;(1048576/15ms)
    [5]--每秒吞吐量:--;(1048576/0ms)
    【对比测试1: ArrayBlockingQueue 实现】
    [1]--每秒吞吐量:4788018;(1048576/219ms)
    [2]--每秒吞吐量:5165399;(1048576/203ms)
    [3]--每秒吞吐量:4809981;(1048576/218ms)
    [4]--每秒吞吐量:5165399;(1048576/203ms)
    [5]--每秒吞吐量:5577531;(1048576/188ms)
    【对比测试2: Disruptor实现】
    [1]--每秒吞吐量:33825032;(1048576/31ms)
    [2]--每秒吞吐量:65536000;(1048576/16ms)
    [3]--每秒吞吐量:65536000;(1048576/16ms)
    [4]--每秒吞吐量:69905066;(1048576/15ms)
    [5]--每秒吞吐量:33825032;(1048576/31ms)

    从测试结果看, Disruptor 的性能比 ArrayBlockingQueue 高出了几乎一个数量级,操作耗时也只有平均20毫秒左右。

    由于篇幅有限,关于 Disruptor 实现高性能的原理,留待以后再做探讨。

     

    六、参考资料

    1. Diruptor 页面:https://github.com/LMAX-Exchange/disruptor
    2. http://ifeve.com/locks-are-bad/
    3. http://xsh5324.iteye.com/blog/2058925?utm_source=tool.lu
    4. http://ifeve.com/disruptor-getting-started/
    5. http://blog.csdn.net/workformywork/article/details/38359447?utm_source=tuicool&utm_medium=referral
    6. http://ifeve.com/disruptor/
    展开全文
  • 本文译自Dirsruptor在...获取Disruptor Disruptor jar包可以从maven仓库mvnrepository获取,可以将其集成进项目的依赖管理中。 <dependency> <groupId>com.lmax</groupId> <artifactId>d...
  • Disruptor使用笔记

    千次阅读 2014-04-20 23:51:46
    在一个生产者和一个消费者的场景中测试表明,无锁队列相比有锁队列,qps有大约10倍的提升,latency更是有几百倍的提升。不管怎么样,现在大家都...所以这些无锁的数据结构和算法,可以尝试借鉴来使用在合适的场景中。
  • disruptor使用示例

    千次阅读 2015-05-15 19:28:39
    Disruptor<LongEvent> disruptor = new Disruptor(factory, bufferSize,executor, ProducerType.SINGLE, new BlockingWaitStrategy()); //设置事件业务处理器---消费者 disruptor.handleEventsWith...
  • HBase中Disruptor使用

    千次阅读 2016-03-19 17:19:55
    我们的循环缓冲区是一个LMAX Disruptor。当多个线程在单个WAL竞争append和sync时,它试图最小化同步与volatile写。 // Disruptor配置为处理多个生产者和仅有一个消费者(HBase中的生产者是调用append、sync的IPC ...
  • <div><p>ReadIndexResponseClosure里持有RingBuffer里的ReadIndexEvent对象,发起node.handleReadIndexRequest请求异步回调Closure,这里切换了线程,ReadIndexEvent里面的值是有可能被addRequest...
  • 最近看了一下部署游戏后台的服务器状况,发现我的一个Java程序其占用的CPU时长超过100%,排查后发现竟是Disruptor引起的,让我们来看看究竟为什么Disruptor会有这样的表现。 发现占用CPU时间超过100%的进程 首先是在...
  • Disruptor<PersonEntity> disruptor = new Disruptor(eventFactory, bufferSize, producerFactory, ProducerType.SINGLE, new BlockingWaitStrategy()); int threadNum = Runtime.getRuntime()....
  • 什么是DisruptorDisruptor是一个开源的JAVA...Disruptor是LMAX在线交易平台的关键组成部分,LMAX平台使用该框架对订单处理速度能达到600万TPS,除金融领域之外,其他一般的应用中都可以用到Disruptor,它可以带来...
  • Disruptor使用

    2017-09-21 15:39:52
    其核心是一个Lock-free的Ringbuffer,Disruptor使用CAS而不是Lock。与大部分并发队列使用的Lock相比,CAS显然要快很多。CAS是CPU级别的指令,更加轻量,不需要像Lock一样需要OS的支持,所以每次调用不需要kernel ...
  • disruptor使用和分析

    千次阅读 2016-04-16 09:49:35
    jdk:1.8介绍:disruptor是典型的生产者和消费者模式,disruptor使用RingBuffer存放数据,sequence管理生产和消费的位置(long类型,递增),生产者要生产数据的时候首先向生产者请求一个sequence,然后在sequence的...
  • Java 使用Disruptor...Disruptor使用观察者模式, 主动将消息发送给消费者, 而不是等消费者从队列中取; 在无锁的情况下, 实现queue(环形, RingBuffer)的并发操作, 性能远高于BlockingQueue。 二、Disruptor的优化策略
  • disruptor使用

    2020-03-19 09:58:38
    已经不记得最早接触到 Disruptor 是什么时候了,只记得发现它的时候它是以具有闪电般的速度被介绍的。于是在脑子里, Disruptor 和“闪电”一词关联了起来,然而却一直没有时间去探究一下。 最近正在进行一项对...
  • Disruptor简单使用

    2019-03-25 16:02:00
      Disruptor从功能上来说,可以实现队列的功能,也可以把它当成单机版的JMS来看待。从性能上来说,它比ArrayBlockingQueue有更好的性能表现,对于生产者消费者模型的业务,Disruptor是一个更好的选择可以很好的...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 689
精华内容 275
关键字:

disruptor使用