精华内容
下载资源
问答
  • disruptor
    2020-11-25 16:31:26

    Disruptor是一个开源的并发框架,提供了类似于Java中有界队列的功能,主要用于生产消费者场景。与Java原生并发队列不同的是,Disruptor高度优化,在单机上可以轻松跑到千万级别的tps与ns级别的延时

    Disruptor的设计思想

    • 无锁:加锁会带来线程的切换,这个是非常耗时的,Disruptor使用了CAS和自旋,提高了性能。同样也使用了序号栅栏和序号配合使用避免了锁和CAS操作。
    • 消除伪共享:现代处理器的缓存用法对于成功的高性能操作至关重要。我们的操作系统用64字节的缓存行进行缓存操作。如果有两个变量在相同的缓存行中,且这两个变量是被不同线程写入,这是就会存在写入争用问题,导致缓存行失效。Disruptor用在Long变量的左右增加其他7个填充Long变量,让变量单独存在一个缓存行中,消除了伪共享。
    • 环形缓冲:Disruptor中使用了环形结构,无需浪费大量的空间存储,可以循环利用。底层使用数组结构,而不是使用链表,性能要更好,每个元素都是在在启动时预分配的。所有的元素在Disruptor实例生存周期中都将重用并一直存在。元素在消费者消费完成之后,生产者会对当前未知的元素进行重用。这样避免了垃圾的产生。

    Disruptor Quick Start

    我们先写一个hello world 程序。要实现Disruptor需要四步:

    • 建立一个工厂Event类,用于创建Event类实例对象。
    • 需要有一个监听事件类,用于处理数据(Event类)。
    • 实例化Disruptor实例,配置一系列参数,编写Disruptor核心组件。
    • 编写生产者组件,向Disruptor容器中去投递数据。

    首先我们创建Event类。

    public class OrderEvent {
        private long value;
    
        public long getValue() {
            return value;
        }
    
        public void setValue(long value) {
            this.value = value;
        }
    }
    

    创建工厂类,返回一个未赋值的OrderEvent实例。

    public class OrderEventFactory implements EventFactory<OrderEvent> {
        @Override
        public OrderEvent newInstance() {
            return new OrderEvent();
        }
    }
    

    编写一个消费者,这里需要实现一个接口。

    public class OrderEventHandler implements EventHandler<OrderEvent> {
        @Override
        public void onEvent(OrderEvent orderEvent, long l, boolean b) throws Exception {
            System.out.println("消费者" + orderEvent.getValue());
        }
    }
    

    编写生产者,这里需要接收RingBuffer实例,用来发送消息。

    public class OrderEventProducer {
        private RingBuffer<OrderEvent> ringBuffer;
    
        public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
            this.ringBuffer = ringBuffer;
        }
    
        public void sendData(long value) {
            //1 在生产者发送消息时,首先需要从我们的ringBuffer里面 获取一个可用的序号
            long sequence = ringBuffer.next();
    
            try {
                //2 根据这个序号,找到具体的"OrderEvent"元素。注意:此时获取的OrderEvent对象是一个没有被赋值的"空对象"
                OrderEvent orderEvent = ringBuffer.get(sequence);
                //3 进行实际的赋值操作
                orderEvent.setValue(value);
            } finally {
                //提交发布操作
                ringBuffer.publish(sequence);
            }
        }
    }
    

    主方法中根据上面的步骤实现Disruptor简单的生产和消费。

    public class Main {
        public static void main(String[] args) {
            //准备参数工作
            OrderEventFactory orderEventFactory = new OrderEventFactory();
            int ringBufferSize = 1024 * 1024;
            ThreadFactory springThreadFactory = new CustomizableThreadFactory("springThread-pool-");
    
            /**
             * 1 orderEventFactory: 消息(Event)工厂对象
             * 2 ringBufferSize: 容器的长度
             * 3 threadFactory: 线程池工厂
             * 4 ProducerType: 单生产者或多生产者
             * 5 waitStrategy: 等待策略
             */
            //1. 实例化Disruptor对象
            Disruptor<OrderEvent> disruptor = new Disruptor(orderEventFactory,
                    ringBufferSize,
                    springThreadFactory,
                    ProducerType.SINGLE,
                    new BlockingWaitStrategy());
    
            //2. 添加消费者监听(构建Disruptor与消费者的一个关联关系)
            disruptor.handleEventsWith(new OrderEventHandler());
    
            //3. 启动disruptor
            disruptor.start();
    
            //4. 获取实际存储数据的容器:RingBuffer
            RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
    
            //5. 创建生产者
            OrderEventProducer producer = new OrderEventProducer(ringBuffer);
            for(long i=0;i<100;i++) {
                producer.sendData(i);
            }
    
            //6. 关闭资源
            disruptor.shutdown();
        }
    }
    

    启动之后打印消费日志。

    消费者0
    消费者1
    消费者2
    ...
    消费者98
    消费者99
    

    Disruptor 串行消费和并行消费

    首先我们创建Event类,模拟交易信息。

    public class Trade {
        private String id;
        private String name;
        private double price;
        private AtomicInteger count = new AtomicInteger(0);
    
        public String getName() {
            return name;
        }
    
        public String getId() {
            return id;
        }
    
        public AtomicInteger getCount() {
            return count;
        }
    
        public double getPrice() {
            return price;
        }
    
        public void setId(String id) {
            this.id = id;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public void setCount(AtomicInteger count) {
            this.count = count;
        }
    
        public void setPrice(double price) {
            this.price = price;
        }
    }
    

    交易信息生产者。TradePubshlisher中使用Disruptor进行生产event,通过循环设置一次发送的event数量,需要传入的对象要实现EventTranslator接口。

    public class TradePubshlisher implements Runnable {
        private CountDownLatch latch;
        private Disruptor<Trade> disruptor;
    
        private static int PUBLISH_COUNT = 10;
    
        public TradePubshlisher(CountDownLatch latch, Disruptor<Trade> disruptor) {
            this.latch = latch;
            this.disruptor = disruptor;
        }
    
        @Override
        public void run() {
            //新的提交任务方式
            for(int i=0;i<PUBLISH_COUNT;i++) {
                disruptor.publishEvent(new TradeEventTranslator());
            }
            latch.countDown();
        }
    }
    
    class TradeEventTranslator implements EventTranslator<Trade> {
        private Random random = new Random();
    
        @Override
        public void translateTo(Trade trade, long l) {
            this.generateTrade(trade);
        }
    
        private void generateTrade(Trade event) {
            event.setPrice(random.nextDouble() * 9999);
        }
    }
    

    创建5个消费者。这里可以通过两种方式实现,实现EventHandler接口或实现WorkHandler。

    public class Handler1 implements EventHandler<Trade>, WorkHandler<Trade> {
        @Override
        public void onEvent(Trade trade, long l, boolean b) throws Exception {
            this.onEvent(trade);
        }
    
        @Override
        public void onEvent(Trade trade) throws Exception {
            System.err.println("handler 1 : SET NAME");
            trade.setName("H1");
            Thread.sleep(1000);
        }
    }
    
    public class Handler2 implements EventHandler<Trade> {
        @Override
        public void onEvent(Trade trade, long l, boolean b) throws Exception {
            System.err.println("handler 2 : SET ID");
            trade.setId(UUID.randomUUID().toString());
            Thread.sleep(2000);
        }
    }
    
    public class  Handler3 implements EventHandler<Trade> {
    
        @Override
        public void onEvent(Trade trade, long l, boolean b) throws Exception {
            System.err.println("handler 3 : NAME: " + trade.getName() + ", ID: " + trade.getId() + " INSTANCE" + trade.toString());
        }
    }
    
    public class Handler4 implements EventHandler<Trade> {
    
        @Override
        public void onEvent(Trade trade, long l, boolean b) throws Exception {
            System.err.println("handler 4 : SET PRICE");
            trade.setPrice(17.0);
        }
    }
    
    public class Handler5 implements EventHandler<Trade> {
        @Override
        public void onEvent(Trade trade, long l, boolean b) throws Exception {
            System.err.println("handler 5 : GET PRICE : " + trade.getPrice());
            trade.setPrice(trade.getPrice() + 3.0);
        }
    }
    

    主方法创建Disruptor并启动,调用生产者进行生产Event。此时准备工作就已经完成。

    串行消费

    串行消费

            disruptor.handleEventsWith(new Handler1())
                    .handleEventsWith(new Handler2())
                    .handleEventsWith(new Handler3());
    

    并行消费有两种方式,第一种:

            disruptor.handleEventsWith(new Handler1());
            disruptor.handleEventsWith(new Handler2());
            disruptor.handleEventsWith(new Handler3());
    

    第二种:

    disruptor.handleEventsWith(new Handler1(), new Handler2(), new Handler3());
    

    菱形操作

    handler1与handler2并行,handler3串行。

    方式一:

            disruptor.handleEventsWith(new Handler1(), new Handler2())
                    .handleEventsWith(new Handler3());
    

    方式二:

            EventHandlerGroup<Trade> handlerGroup = disruptor.handleEventsWith(new Handler1(), new Handler2());
            handlerGroup.then(new Handler3());
    

    上面的Handler4和Handler5还没有使用,我们现在实现这种执行流程。
    在这里插入图片描述

            Handler1 handler1 = new Handler1();
            Handler2 handler2 = new Handler2();
            Handler3 handler3 = new Handler3();
            Handler4 handler4 = new Handler4();
            Handler5 handler5 = new Handler5();
            disruptor.handleEventsWith(handler1, handler4);
            disruptor.after(handler1).handleEventsWith(handler2);
            disruptor.after(handler4).handleEventsWith(handler5);
            disruptor.after(handler2, handler5).handleEventsWith(handler3);
    

    多生产者和多消费者

    创建Event类。

    public class Order {
        private String id;
        private String name;
        private double price;
    
        public String getName() {
            return name;
        }
    
        public String getId() {
            return id;
        }
    
        public double getPrice() {
            return price;
        }
    
        public void setId(String id) {
            this.id = id;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public void setPrice(double price) {
            this.price = price;
        }
    }
    

    发送者通过ringBuffer获取序号上对应的元素,进行消息发送。

    public class OrderEventProducer {
        private RingBuffer<Order> ringBuffer;
    
        public OrderEventProducer(RingBuffer<Order> ringBuffer) {
            this.ringBuffer = ringBuffer;
        }
    
        public void sendData(String uuid) {
            //1 在生产者发送消息时,首先需要从我们的ringBuffer里面 获取一个可用的序号
            long sequence = ringBuffer.next();
    
            try {
                //2 根据这个序号,找到具体的"OrderEvent"元素。注意:此时获取的OrderEvent对象是一个没有被赋值的"空对象"
                Order order = ringBuffer.get(sequence);
                //3 进行实际的赋值操作
                order.setId(uuid);
            } finally {
                //提交发布操作
                ringBuffer.publish(sequence);
            }
        }
    }
    

    消费者收到消息之后进行打印。

    public class Consumer implements WorkHandler<Order> {
        private String consumerId;
    
        private static AtomicInteger count = new AtomicInteger();
    
        private Random random = new Random();
    
        public Consumer(String consumerId) {
            this.consumerId = consumerId;
        }
    
        @Override
        public void onEvent(Order order) throws Exception {
            Thread.sleep(1 * random.nextInt(5));
            System.err.println("当前消费者: " + this.consumerId + ", 消费信息ID: " + order.getId());
            count.incrementAndGet();
        }
    
        public int getCount() {
            return count.get();
        }
    }
    

    在主类中分成几步进行多生产者的发送和多消费者的接收:

    • 创建RingBuffer。
    • 通过ringBuffer 创建一个屏障。
    • 创建多个消费者。
    • 构造多消费者工作池。
    • 设置多个消费者的sequence序号 用于单独统计消费进度 并且设置到ringbuffer中。
    • 启动WorkerPool。
    • 创建多个线程发送数据。
    public class Main {
        public static void main(String[] args) throws InterruptedException {
            //1 创建RingBuffer
            RingBuffer<Order> orderRingBuffer = RingBuffer.create(ProducerType.MULTI, () -> new Order(), 1024 * 1024, new YieldingWaitStrategy());
    
            //2 通过ringBuffer 创建一个屏障
            SequenceBarrier sequenceBarrier = orderRingBuffer.newBarrier();
    
            //3 创建多个消费者
            Consumer[] consumers = new Consumer[10];
            for (int i = 0; i < consumers.length; i++) {
                consumers[i] = new Consumer("C" + i);
            }
    
            //4 构造多消费者工作池
            WorkerPool<Order> orderWorkerPool = new WorkerPool<>(orderRingBuffer,
                    sequenceBarrier,
                    new EventExceptionHandler(),
                    consumers);
    
            //5 设置多个消费者的sequence序号 用于单独统计消费进度 并且设置到ringbuffer中
            orderRingBuffer.addGatingSequences(orderWorkerPool.getWorkerSequences());
    
            //6 启动WorkerPool
            orderWorkerPool.start(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
    		//7 创建多个线程发送数据
            CountDownLatch latch = new CountDownLatch(1);
            for (int i = 0; i < 100; i++) {
                OrderEventProducer producer = new OrderEventProducer(orderRingBuffer);
                new Thread(() -> {
                    try {
                        //阻塞等待发数据
                        latch.await();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    for(int j=0;j<100;j++) {
                        producer.sendData(UUID.randomUUID().toString());
                    }
    
                }).start();
            }
    
            Thread.sleep(3000);
            System.err.println("------------线程创建完毕,开始生产数据---------------");
            latch.countDown();
            //等待生产数据
            Thread.sleep(5000);
    
            System.err.println("总任务数" + consumers[2].getCount());
        }
    }
    

    这里需要有一个处理异常的类,我们暂且空实现。

    public class EventExceptionHandler implements ExceptionHandler<Order> {
        @Override
        public void handleEventException(Throwable throwable, long l, Order order) {
    
        }
    
        @Override
        public void handleOnStartException(Throwable throwable) {
    
        }
    
        @Override
        public void handleOnShutdownException(Throwable throwable) {
    
        }
    }
    

    启动服务后,我们可以看到多个消费者同时对消息进行消费。

    当前消费者: C6, 消费信息ID: 1fdc8e24-2e30-4a08-84e8-73448b51ead2
    当前消费者: C3, 消费信息ID: 9c296537-f39c-48e3-b3d8-657d8367956b
    当前消费者: C1, 消费信息ID: 26fd113f-f203-4b0e-b0f0-57c51aba59cc
    当前消费者: C2, 消费信息ID: 4ea05ffc-dc95-408e-91c9-7dc4fa208bf4
    当前消费者: C7, 消费信息ID: 1a977120-714c-458e-8ff2-6b33bf643ae2
    当前消费者: C0, 消费信息ID: 5b9034d7-8141-455f-ab2e-df490a31a0f8
    当前消费者: C4, 消费信息ID: 20e1e9af-b2f1-464b-96ab-a0c727545021
    当前消费者: C4, 消费信息ID: 29487e71-f232-4f45-a8d0-095a6440e2da
    当前消费者: C4, 消费信息ID: 96d6b082-c484-4e32-982a-36f2f55f83e2
    当前消费者: C5, 消费信息ID: 095447d6-4554-4f58-8f28-8322d1660df2
    当前消费者: C1, 消费信息ID: 8a259459-b287-4397-a83d-dec430b60022
    当前消费者: C7, 消费信息ID: 364e5a93-1cb0-4498-abda-aef19158ab4a
    当前消费者: C7, 消费信息ID: f2208e48-ece3-40b2-b4e8-a4f4e61d0cc7
    当前消费者: C0, 消费信息ID: d3a86b27-8b92-4e1b-ae4c-13bd057fd567
    当前消费者: C6, 消费信息ID: 1653faad-0b92-46e5-bebd-5b422b6db29a
    当前消费者: C3, 消费信息ID: 62d83e6b-021f-4f4f-aa1a-d244a745df3e
    当前消费者: C3, 消费信息ID: 520368f7-447e-45b7-b159-ca140039e1f1
    当前消费者: C3, 消费信息ID: 1456d28b-aae2-4261-96a0-b389c565037b
    当前消费者: C4, 消费信息ID: 8c68fb36-2885-4e25-8987-b711209721a8
    当前消费者: C1, 消费信息ID: 9a8f3ae3-451c-45e7-95cf-ad95722e6885
    当前消费者: C2, 消费信息ID: e921ee14-a035-4131-802c-86a3eb3a8802
    当前消费者: C1, 消费信息ID: c8379f89-c8af-4948-8e78-5e0c61d92727
    当前消费者: C7, 消费信息ID: 614e7566-b871-4445-875d-8b768fd4b693
    当前消费者: C5, 消费信息ID: 1226159e-5212-4000-90c3-cae48cf698e9
    当前消费者: C3, 消费信息ID: 39d6afa8-0313-44e5-86b2-36300c7f7d15
    当前消费者: C0, 消费信息ID: 70504a6e-eccb-4503-91df-508f54261380
    当前消费者: C2, 消费信息ID: b47d8aeb-850c-4df0-8280-667b6e49317f
    当前消费者: C6, 消费信息ID: d75a4d32-1949-46f1-9630-bffe7b3dcc5a
    当前消费者: C4, 消费信息ID: 7dc5db53-9614-457f-9d88-4b1a0f8ff7c9
    总任务数10000
    

    源码解析

    初始化

    我们可以通过创建Disruptor类和RingBuffer类实现功能,直接操作RingBuffer更加灵活也更麻烦。Disruptor类提供了操作RingBuffer和设置消费依赖的便捷API,如构建Ringbuffer、设置消费链、启动关闭Disruptor、暂停消费者、发布事件等。

    disruptor = new Disruptor<ExampleEvent>(
        new ExampleEventFactory(),  // 用于创建环形缓冲中对象的工厂
        8,  // 环形缓冲的大小
        threadFactory,  // 用于事件处理的线程工厂
        ProducerType.MULTI, // 生产者类型,单vs多生产者
        new BlockingWaitStrategy()); // 等待环形缓冲游标的等待策略,这里使用阻塞模式,也是Disruptor中唯一有锁的地方
    

    这里调用构造方法创建了一个Disruptor对象,实际上创建了一个RingBuffer对象和一个Executor,并将引入传入私有化的构造方法创建了Disruptor对象。

    // Disruptor.java
    public Disruptor(
            final EventFactory<T> eventFactory, // 用于创建环形缓冲中对象的工厂
            final int ringBufferSize, // 环形缓冲的大小
            final ThreadFactory threadFactory, // 用于事件处理的线程工厂
            final ProducerType producerType, // 生产者类型,单vs多生产者
            final WaitStrategy waitStrategy) // 等待环形缓冲游标的等待策略
    {
        this(
            RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
            new BasicExecutor(threadFactory));
    }
    
    private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor)
    {
        this.ringBuffer = ringBuffer;
        this.executor = executor;
    }
    
    // RingBuffer.java
    public static <E> RingBuffer<E> create(
            ProducerType producerType,
            EventFactory<E> factory,
            int bufferSize,
            WaitStrategy waitStrategy)
        {
            switch (producerType) // 构建RingBuffer时通过producerType来区分单生产者或多生产者
            {
                case SINGLE:
                    return createSingleProducer(factory, bufferSize, waitStrategy);
                case MULTI:
                    return createMultiProducer(factory, bufferSize, waitStrategy);
                default:
                    throw new IllegalStateException(producerType.toString());
            }
        }
    
    // 单生产者模式创建RingBuffer
    public static <E> RingBuffer<E> createSingleProducer(
        EventFactory<E> factory,
        int bufferSize,
        WaitStrategy waitStrategy)
    {
        SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);
    
        return new RingBuffer<E>(factory, sequencer);
    }
    
    // 多生产者模式创建RingBuffer
    public static <E> RingBuffer<E> createMultiProducer(
            EventFactory<E> factory,
            int bufferSize,
            WaitStrategy waitStrategy)
        {
            MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);
    
            return new RingBuffer<E>(factory, sequencer);
        }
    
    // RingBuffer构造器
    RingBuffer(
        EventFactory<E> eventFactory,
        Sequencer sequencer)
    {
        super(eventFactory, sequencer);
    }
    

    这里注意下,在构造RingBuffer时,需要传入用于创建事件对象的工厂eventFactory和记录生产者序号的sequencer。根据生产者是否是多线程生产,Sequencer又分为单、多生产者模式,后续还会讲到。

    RingBuffer的具体构造方法是在父类中。其中主要就是对容量的初始化和内存预加载。

        RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {
            this.sequencer = sequencer;
            this.bufferSize = sequencer.getBufferSize();
            if (this.bufferSize < 1) {
                throw new IllegalArgumentException("bufferSize must not be less than 1");
            } else if (Integer.bitCount(this.bufferSize) != 1) {
                throw new IllegalArgumentException("bufferSize must be a power of 2");
            } else {
                this.indexMask = (long)(this.bufferSize - 1);
                //容量初始化
                this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
                //内存预加载的核心实现
                this.fill(eventFactory);
            }
        }
        
        private void fill(EventFactory<E> eventFactory) {
            for(int i = 0; i < this.bufferSize; ++i) {
                this.entries[BUFFER_PAD + i] = eventFactory.newInstance();
            }
        }
    

    构建Disruptor实例后,需要设置Disruptor的消费者,之后就可以调用disruptor.start();进行启动了。

        public RingBuffer<T> start() {
            this.checkOnlyStartedOnce();
            Iterator var1 = this.consumerRepository.iterator();
    
            while(var1.hasNext()) {
                ConsumerInfo consumerInfo = (ConsumerInfo)var1.next();
                consumerInfo.start(this.executor);
            }
    
            return this.ringBuffer;
        }
    

    这里就是循环调用每个消费者的start方法,启动各自的线程开始消费。

    生产者

    生产者一般就是我们的应用线程,在发布通常使用一个EventTranslator将数据转移到RingBuffer上,因为不涉及共享数据和实例变量,通常使用同一个EventTranslator实例进行操作。上面的例子中也有体现。

    public class TradePubshlisher implements Runnable {
        private CountDownLatch latch;
        private Disruptor<Trade> disruptor;
    
        private static int PUBLISH_COUNT = 1;
    
        public TradePubshlisher(CountDownLatch latch, Disruptor<Trade> disruptor) {
            this.latch = latch;
            this.disruptor = disruptor;
        }
    
        @Override
        public void run() {
            //新的提交任务方式
            for(int i=0;i<PUBLISH_COUNT;i++) {
                disruptor.publishEvent(new TradeEventTranslator());
            }
            latch.countDown();
        }
    }
    
    class TradeEventTranslator implements EventTranslator<Trade> {
        private Random random = new Random();
    
        @Override
        public void translateTo(Trade trade, long l) {
            this.generateTrade(trade);
        }
    
        private void generateTrade(Trade event) {
            event.setPrice(random.nextDouble() * 9999);
        }
    }
    

    我们来看一下publishEvent方法的源码。

    public <A> void publishEvent(final EventTranslatorOneArg<T, A> eventTranslator, final A arg)
    {
        ringBuffer.publishEvent(eventTranslator, arg);
    }
    //之前也讲过,Disruptor这个类是一个辅助类,在发布事件时其实是委托给RingBuffer完成发布操作。
    //RingBuffer.publishEvent()的逻辑大概分为两个步骤:第一步先占有RingBuffer上的一个可用位置,我们简称为“占坑”;第二步在可用位置发布数据,我们简称为“填坑”。
    // RingBuffer
    public <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0)
    {
        final long sequence = sequencer.next(); // 第一步 占坑
        translateAndPublish(translator, sequence, arg0); // 第二步 填坑
    }
    

    在第一步占坑中,首先通过调用Sequencer.next()获取RingBuffer实例下一个能用的序号。
    AbstractSequencer作为一个抽象类,实现了Sequencer接口,是单生产者Sequencer和多生产者Sequencer的父类。

    其中第二步中,在填坑完毕还要调用Sequencer接口的publish方法对外发布事件。

    Sequencer是Disruptor的核心,用来保证生产者和消费者之间正确、高速传递数据的。我们先来看看以生产者的角度看Sequencer有什么作用。
    在这里插入图片描述
    下边是Sequencer接口及其父接口Cursored、Sequenced 定义。

    interface Sequencer extends Cursored, Sequenced
    {
        /**
        * 序号开始位置
        */
        long INITIAL_CURSOR_VALUE = -1L;
    
        /**
        * 声明指定序号,只用在初始化RingBuffer到指定值,基本上不用了
        */
        void claim(long sequence);
    
        /**
        * 用非阻塞方式,确认某个序号是否已经发布且事件可用。
        */
        boolean isAvailable(long sequence);
    
        /**
        * 增加门控序列(消费者序列),用于生产者在生产时避免追尾消费者
        */
        void addGatingSequences(Sequence... gatingSequences);
    
        /**
        * 从门控序列中移除指定序列
        */
        boolean removeGatingSequence(Sequence sequence);
    
        /**
        * 消费者使用,用于追踪指定序列(通常是上一组消费者的序列)
        */
        SequenceBarrier newBarrier(Sequence... sequencesToTrack);
    
        /**
        * 获取追踪序列中最小的序列
        */
        long getMinimumSequence();
    
        /**
        * 获取能够从环形缓冲读取的最高的序列号。依赖Sequencer的实现,可能会扫描Sequencer的一些值。扫描从nextSequence
        * 到availableSequence。如果没有大于等于nextSequence的可用值,返回值将为nextSequence-1。为了工作正常,消费者
        * 应该传递一个比最后成功处理的序列值大1的值。
        */
        long getHighestPublishedSequence(long nextSequence, long availableSequence);
    
        <T> EventPoller<T> newPoller(DataProvider<T> provider, Sequence... gatingSequences);
    }
    
    // Cursored.java
    /**
    * 游标接口,用于获取生产者当前游标位置
    */
    public interface Cursored
    {
        /**
        * Get the current cursor value.
        *
        * @return current cursor value
        */
        long getCursor();
    }
    
    // Sequenced.java
    public interface Sequenced
    {
        /**
        * 获取环形缓冲的大小
        */
        int getBufferSize();
    
        /**
        * 判断是否含有指定的可用容量
        */
        boolean hasAvailableCapacity(final int requiredCapacity);
    
        /**
        * 剩余容量
        */
        long remainingCapacity();
    
        /**
        * 生产者发布时,申请下一个序号
        */
        long next();
    
        /**
        * 申请n个序号,用于批量发布
        */
        long next(int n);
    
        /**
        * next()的非阻塞模式
        */
        long tryNext() throws InsufficientCapacityException;
    
        /**
        * next(n)的非阻塞模式
        */
        long tryNext(int n) throws InsufficientCapacityException;
    
        /**
        * 数据填充后,发布此序号
        */
        void publish(long sequence);
    
        /**
        * 批量发布序号
        */
        void publish(long lo, long hi);
    }
    

    下边先看使用单生产者SingleProducerSequencer具体是怎么占坑的。

    // SingleProducerSequencer.java
    @Override
    public long next()
    {
        return next(1);
    }
    
    /**
    * @see Sequencer#next(int)
    */
    @Override
    public long next(int n)
    {
        if (n < 1)
        {
            throw new IllegalArgumentException("n must be > 0");
        }
        // 复制上次申请完毕的序列值
        long nextValue = this.nextValue;
        // 加n,得到本次需要申请的序列值,单个发送n为1
        long nextSequence = nextValue + n; // 本次要验证的值
        // 可能发生绕环的点,本次申请值 - 一圈长度
        long wrapPoint = nextSequence - bufferSize;
        long cachedGatingSequence = this.cachedValue; // 数值最小的序列值,也就是最慢消费者
        // wrapPoint 等于 cachedGatingSequence 将发生绕环行为,生产者将在环上,从后方覆盖未消费的事件。
        // 如果即将生产者超一圈从后方追消费者尾(要申请的序号落了最慢消费者一圈)或 消费者追生产者尾,将进行等待。后边这种情况应该不会发生吧?
        // 针对以上值举例:400米跑道(bufferSize),小明跑了599米(nextSequence),小红(最慢消费者)跑了200米(cachedGatingSequence)。小红不动,小明再跑一米就撞翻小红的那个点,叫做绕环点wrapPoint。
        // 没有空坑位,将进入循环等待。
        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
        {
            cursor.setVolatile(nextValue);  // StoreLoad fence
    
            long minSequence;
            // 只有当消费者消费,向前移动后,才能跳出循环
            // 由于外层判断使用的是缓存的消费者序列最小值,这里使用真实的消费者序列进行判断,并将最新结果在跳出while循环之后进行缓存
            while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
            {  // 唤醒等待的消费者
                waitStrategy.signalAllWhenBlocking();
                LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
            }
            // 当消费者向前消费后,更新缓存的最小序号
            this.cachedValue = minSequence;
        }
        // 将成功申请的序号赋值给对象实例变量
        this.nextValue = nextSequence;
    
        return nextSequence;
    }
    

    这里主要就是生产者发布的序号要小于消费者最小的消费序号。next()占坑成功将会返回坑位号,回到RingBuffer的publishEvent方法,执行translateAndPublish方法,进行填坑和发布操作。

    // RingBuffer.java
    private void translateAndPublish(EventTranslator<E> translator, long sequence)
    {
        try
        {
            translator.translateTo(get(sequence), sequence);
        }
        finally
        {
            sequencer.publish(sequence);
        }
    }
    

    translator参数用户定义的对EventTranslator接口的实现对象,用来填充数据。

    // SingleProducerSequencer.java 
    @Override
    public void publish(long sequence)
    {  // 在发布此位置可用时,需要更新Sequencer内部游标值,并在使用阻塞等待策略时,通知等待可用事件的消费者进行继续消费
        cursor.set(sequence);
        // 除signalAllWhenBlocking外都是空实现
        waitStrategy.signalAllWhenBlocking();
    }
    

    在放好数据后,就可以调用sequencer的publish方法发布对象了。首先是更新当前游标,更新完毕再通知等待中的消费者,消费者将继续消费。

    // SingleProducerSequencer.java 
    @Override
    public void publish(long sequence)
    {  // 在发布此位置可用时,需要更新Sequencer内部游标值,并在使用阻塞等待策略时,通知等待可用事件的消费者进行继续消费
        cursor.set(sequence);
        // 除BlockingWaitStrategy外都是空实现
        waitStrategy.signalAllWhenBlocking();
    }
    

    如果使用的是多生产者,占坑则调用MultiProducerSequencer.next()。

    @Override
    public long next()
    {
        return next(1);
    }
    
    /**
    * @see Sequencer#next(int)
    */
    @Override
    public long next(int n)
    {
        if (n < 1)
        {
            throw new IllegalArgumentException("n must be > 0");
        }
    
        long current;
        long next;
    
        do
        {
            current = cursor.get(); // 当前游标值,初始化时是-1
            next = current + n;
    
            long wrapPoint = next - bufferSize;
            long cachedGatingSequence = gatingSequenceCache.get();
    
            if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
            {
                long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
    
                if (wrapPoint > gatingSequence)
                {
                    waitStrategy.signalAllWhenBlocking();
                    LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
                    continue;
                }
    
                gatingSequenceCache.set(gatingSequence);
            }
            else if (cursor.compareAndSet(current, next))
            {
                break;
            }
        }
        while (true);
    
        return next;
    }
    

    可以发现,多生产者模式占坑和放置数据的逻辑和单生产者模式区别不大。区别主要是最后调用publish发布坑位的逻辑。

    // MultiProducerSequencer.java
    private static final Unsafe UNSAFE = Util.getUnsafe();
    private static final long BASE = UNSAFE.arrayBaseOffset(int[].class); // 获取int[]数组类的第一个元素与该类起始位置的偏移。
    private static final long SCALE = UNSAFE.arrayIndexScale(int[].class); // 每个元素需要占用的位置,也有可能返回0。BASE和SCALE都是为了操作availableBuffer
    
    private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
    
    // availableBuffer tracks the state of each ringbuffer slot
    // see below for more details on the approach
    private final int[] availableBuffer; // 初始全是-1
    private final int indexMask;
    private final int indexShift;
    
    @Override
    public void publish(final long sequence)
    {
        setAvailable(sequence);
        waitStrategy.signalAllWhenBlocking(); // 如果使用BlokingWaitStrategy,才会进行通知。否则不会操作
    }
    
    @Override
    public void publish(long lo, long hi)
    {
        for (long l = lo; l <= hi; l++)
        {
            setAvailable(l);
        }
        waitStrategy.signalAllWhenBlocking();
    }
    /
    * availableBuffer设置可用标志
    * 主要原因是避免发布者线程之间共享一个序列对象。
    * 游标和最小门控序列的差值应该永远不大于RingBuffer的大小(防止生产者太快,覆盖未消费完的数据)
    */
    private void setAvailable(final long sequence)
    { // calculateIndex 求模%, calculateAvailabilityFlag 求除/
        setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));
    }
    
    private void setAvailableBufferValue(int index, int flag)
    {  // 使用Unsafe更新属性,因为是直接操作内存,所以需要计算元素位置对应的内存位置bufferAddress
        long bufferAddress = (index * SCALE) + BASE;
        // availableBuffer是标志可用位置的int数组,初始全为-1。随着sequence不断上升,buffer中固定位置的flag(也就是sequence和bufferSize相除的商)会一直增大。
        UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
    }
    
    private int calculateAvailabilityFlag(final long sequence)
    { // 求商 就是 sequence / bufferSize , bufferSize = 2^indexShift。
        return (int) (sequence >>> indexShift);
    }
    
    private int calculateIndex(final long sequence)
    { // 计算位置即求模,直接使用序号 与 掩码(2的平方-1,也就是一个全1的二进制表示),相当于 sequence % (bufferSize), bufferSize = indexMask + 1
        return ((int) sequence) & indexMask;
    }
    

    对比SingleProducerSequencer的publish,MultiProducerSequencer的publish没有设置cursor,而是将内部使用的availableBuffer数组对应位置进行设置。availableBuffer是一个记录RingBuffer槽位状态的数组,通过对序列值sequence取ringBuffer大小的模,获得槽位号,再通过与ringBuffer大小相除,获取序列值所在的圈数,进行设置。这里没有直接使用模运算和触发运算,而使用更高效的位与和右移操作。
    其他的操作,MultiProducerSequencer和SingleProducerSequencer类似,就不再赘述了。

    消费者

    队列和Disruptor在表现行为上最大的区别。队列中的一个事件只能被一个消费者消费,而Disruptor中的事件会发布给所有消费者。特别适合同一数据的独立并行处理操作。

    消费者依赖图(消费链):同一事件需要被多个消费者消费时,消费者之间可能有依赖关系,如消费者A,B,C,B和C依赖A先执行,但是B和C可以并行消费。

    Disruptor的消费者依赖EventProcessor循环处理可用事件。EventProcessor顾名思义,就是事件处理器(handle和process都可以翻译为“处理”,但是process侧重于机器的处理,而handle侧重于有人工的处理,所以使用handle表示用户逻辑的处理,使用process表示机器的处理),这个接口有两个实现类,分别是WorkProcessor和BatchEventProcessor,它们对应的逻辑处理消费者分别是EventHandler和WorkHandler。下面是EventProcessor的UML类图及EventHandler和EventProcessor的接口定义。
    在这里插入图片描述
    EventProcessor接口继承了Runnable接口,主要有两种实现:单线程批量处理BatchEventProcessor和多线程处理WorkProcessor。

    在使用Disruptor帮助类构建消费者时,使用handleEventsWith方法传入多个EventHandler,内部使用多个BatchEventProcessor关联多个线程执行。这种情况类似JMS中的发布订阅模式,同一事件会被多个消费者并行消费。适用于同一事件触发多种操作。

    而使用Disruptor的handleEventsWithWorkerPool传入多个WorkHandler时,内部使用多个WorkProcessor关联多个线程执行。这种情况类似JMS的点对点模式,同一事件会被一组消费者其中之一消费。适用于提升消费者并行处理能力。

    BatchEventProcessor主要用于处理单线程并行任务,同一消费者组的不同消费者会接收相同的事件,并在所有事件处理完毕后进入下一消费者组进行处理(是不是类似JUC里的Phaser、CyclicBarrier或CountDownLatch呢)。WorkProcessor通过WorkerPool管理多个WorkProcessor,达到多线程处理事件的目的,同一消费者组的多个WorkProcessor不会处理同一个事件。通过选择不同的WaitStragegy实现,可以控制消费者在没有可用事件处理时的等待策略。

    RingBuffer

    /*
     * 填充辅助类,为解决缓存的伪共享问题,需要对每个缓存行(64B)进行填充
     */
    abstract class RingBufferPad
    { // https://github.com/LMAX-Exchange/disruptor/issues/167
        /*
        RingBufferFields中的属性被频繁读取,这里的属性是为了避免RingBufferFields遇到伪共享问题
         */
        protected long p1, p2, p3, p4, p5, p6, p7;
    }
    
    abstract class RingBufferFields<E> extends RingBufferPad {
        private static final int BUFFER_PAD; // 用于在数组中进行缓存行填充的空元素个数
        private static final long REF_ARRAY_BASE; // 内存中引用数组的开始元素基地址,是数组开始的地址+BUFFER_PAD个元素的偏移量之和,后续元素的内存地址需要在此基础计算地址
        private static final int REF_ELEMENT_SHIFT; // 引用元素的位移量,用于计算BUFFER_PAD偏移量,基于位移计算比乘法运算更高效
        private static final Unsafe UNSAFE = Util.getUnsafe(); // 上面的变量都是为了UNSAFE的操作
    
        static {
            final int scale = UNSAFE.arrayIndexScale(Object[].class); // arrayIndexScale获取数组中一个元素占用的字节数,不同JVM实现可能有不同的大小
            if (4 == scale) {
                REF_ELEMENT_SHIFT = 2;
            } else if (8 == scale) {
                REF_ELEMENT_SHIFT = 3;
            } else {
                throw new IllegalStateException("Unknown pointer size");
            }
            BUFFER_PAD = 128 / scale; // BUFFER_PAD=32 or 16,为什么是128呢?是为了满足处理器的缓存行预取功能(Adjacent Cache-Line Prefetch)
            // https://github.com/LMAX-Exchange/disruptor/issues/158
            // https://software.intel.com/en-us/articles/optimizing-application-performance-on-intel-coret-microarchitecture-using-hardware-implemented-prefetchers
            // Including the buffer pad in the array base offset
            // BUFFER_PAD << REF_ELEMENT_SHIFT 实际上是BUFFER_PAD * scale的等价高效计算方式
            REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << REF_ELEMENT_SHIFT);
        }
    
        private final long indexMask; // 用于进行 & 位与操作,实现高效的模操作
        private final Object[] entries;
        protected final int bufferSize;
        protected final Sequencer sequencer; // 生产者序列号
        // 省略...
    }
    public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E>
    {
        public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE; // 游标初始值 -1
        protected long p1, p2, p3, p4, p5, p6, p7;
    }
    

    这里我们可以把Disruptor中的RingBuffer简单地理解为一个经过特殊优化的数组。
    这个“特殊的数组”的特别之处在于:

    • 尽可能消除缓存的伪共享问题;
    • 使用数组存储,预先分配(尽可能)连续的内存地址,非常适合FIFO的时序消息特性,充分利用CPU Cache预取能力;
    • 对象重用,减少不必要的GC;

    WaitStrategy

    • BlockingWaitStrategy:加锁,CPU资源紧缺,吞吐量和延迟并不重要的场景
      BusySpinWaitStrategy:自旋,通过不断重试,减少切换线程导致的系统调用,而降低延迟。推荐在线程绑定到固定的CPU的场景下使用
      PhasedBackoffWaitStrategy:自旋 + yield + 自定义策略,CPU资源紧缺,吞吐量和延迟并不重要的场景
      SleepingWaitStrategy:自旋 + yield + sleep,性能和CPU资源之间有很好的折中。延迟不均匀
      TimeoutBlockingWaitStrategy:加锁,有超时限制 CPU资源紧缺,吞吐量和延迟并不重要的场景
      YieldingWaitStrategy:自旋 + yield + 自旋,性能和CPU资源之间有很好的折中。延迟比较均匀
    更多相关内容
  • 基于Disruptor的Spring Boot Starter实现,初步事件推导,处理封装 1,事件推动 a,配置简单,少量配置即可实现初始化事件推送 2,事件处理 a、配置简单,少量配置即可实现异步事件处理 b、组件实现了基于责任链的...
  • disruptor-3.4.4.jar 官方github下载 亲测可用,大家赶紧下载吧 后续再补充其他常用jar(但不好下载的)
  • disruptor-3.3.6.jar

    2022-02-24 06:23:07
    java运行依赖jar包
  • Disruptor-cpp 总览 Disruptor-cpp是功能齐全的C ++端口。 实现java Disruptor v3.3.7中可用的所有功能。 建造 编译器 Clang 3.8或更高版本 GCC 5.0或更高版本 Microsoft Visual C ++ 2015或更高版本 Linux 必须在...
  • Conversant ConcurrentQueue、Disruptor BlockingQueue 和 ConcurrentStack Disruptor是Java中性能最高的线程内传输机制。 Conversant Disruptor 是这种环形缓冲区中性能最高的实现,因为它几乎没有开销,并且采用了...
  • 卡夫卡消费者的破坏者演示如何在Kafka 0.9 Consumer上使用LMAX Disruptor 好处->一旦先前的使用者完全处理完消息,便可以使用序列屏障来提交消息。想象力是极限。如果环形缓冲区可以容纳在L3缓存中,则处理速度会更...
  • 并发框架Disruptor介绍Martin Fowler在自己网站上写了一篇LMAX架构的文章,在文章中他介绍了LMAX是一种新型零售金融交易平台,它能够以很低的延迟产生大量交易。这个系统是建立在JVM平台上,其核心是一个业务逻辑...
  • 赠送jar包:disruptor-3.3.0.jar; 赠送原API文档:disruptor-3.3.0-javadoc.jar; 赠送源代码:disruptor-3.3.0-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.0.pom; 包含翻译后的API文档:disruptor-...
  • Okra Okra是一个简单的使用JAVA开发的高性能,高扩展,高并发,低延迟的服务器框架。 主要目的是帮助中小团队快速开发实现网络游戏服务端。 本项目包含完整的MMORPG等游戏服务器的DEMO. Dependencies: ...
  • 赠送jar包:disruptor-3.3.7.jar; 赠送原API文档:disruptor-3.3.7-javadoc.jar; 赠送源代码:disruptor-3.3.7-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.7.pom; 包含翻译后的API文档:disruptor-...
  • 赠送jar包:disruptor-3.3.7.jar 赠送原API文档:disruptor-3.3.7-javadoc.jar 赠送源代码:disruptor-3.3.7-sources.jar 包含翻译后的API文档:disruptor-3.3.7-javadoc-API文档-中文(简体)-英语-对照版.zip ...
  • disruptor-3.3.8.jar

    2018-10-18 11:27:37
    Error: java.lang.NoSuchMethodError: com.lmax.disruptor.dsl.Disruptor.<init>(Lcom/lmax/disruptor/EventFactory;ILjava/util/concurrent/ThreadFactory;Lcom/lmax/disruptor/dsl/ProducerType;Lcom/lmax/...
  • 赠送jar包:disruptor-3.3.0.jar; 赠送原API文档:disruptor-3.3.0-javadoc.jar; 赠送源代码:disruptor-3.3.0-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.0.pom; 包含翻译后的API文档:disruptor-...
  • 并发框架Disruptor

    2017-11-21 10:19:14
    Java并发框架Disruptor,里面采取环形缓存结构,速度更快,适用于生产者消费者模式
  • Disruptor.Net:破坏者

    2021-06-29 12:30:31
    Disruptor.Net VS.Net 版本的高性能线程间消息传递库。 VS2010-Net 4.0 对于 Visual Studio 2010 并使用 .Net framework 4.0。 VS2013-Net 4.5 对于 Visual Studio 2013 并使用 .Net framework 4.5。 LMAX ...
  • Disruptor示例

    2018-10-19 12:30:56
    业务逻辑处理器的核心是DisruptorDisruptor它是一个开源的并发框架,并获得2011 Duke’s 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作。 Disruptor是一个高性能的异步处理框架,或者可以认为是最...
  • LMAX Disruptor环形缓冲区示例 使用LMAX破坏者框架的示例: 受此博客文章的启发: 这是“钻石配置”的极其简化的版本,如以下内容所述: 在此实现中,日志记录和复制步骤同时发生,并且都必须成功才能执行发布...
  • disruptor-3.2.0.jar

    2018-03-14 15:52:59
    disruptor-3.2.0.jar包下载disruptor-3.2.0.jar包下载disruptor-3.2.0.jar包下载
  • 破坏者4cpp LMAX干扰器的... 例如, $ git clone https://github.com/alexleemanfui/disruptor4cpp.git$ cd disruptor4cpp$ mkdir /opt/disruptor4cpp/$ cp -pr include/ /opt/disruptor4cpp/要运行测试, $ git clone ...
  • 212,598 ops/secRun 1, Disruptor=51,921,079 ops/secRun 2, Disruptor=50,864,699 ops/secRun 3, Disruptor=59,630,292 ops/secRun 4, Disruptor=62,227,753 ops/secRun 5, Disruptor=59,988,002 ops/secRun 6, ...
  • disruptor-3.4.2.jar

    2019-04-21 18:04:11
    disruptor-3.4.2.jar, 工具jar包, Disruptor它是一个开源的并发框架,并获得2011 Duke’s 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作
  • Disruptor概述这是LMAX Disruptor进入Go编程语言的端口。 它保留了Disruptor的本质和精神,并使用了许多相同的抽象和概念,但并没有保持相同的Disruptor概述。这是LMAX Disruptor移植到Go编程语言中的移植。 它保留...
  • LMAX-Disruptor框架jar包

    2017-12-28 23:02:33
    Disruptor框架是由LMAX公司开发的一款高效的无锁内存队列。使用无锁的方式实现了一个环形队列。据官方描述,其性能要比BlockingQueue至少高一个数量级。根据GitHub上的最新版本源码打出的包,希望对大家有帮助。
  • Disruptor3.x Disruptor使用方式 EventHandler[] eventHandlers=new DisruptorEventHandler[]{new DisruptorEventHandler()}; DisruptorPublisher dp=new DisruptorPublisher(1024, eventHandlers); dp.start(); ...
  • async-framework是基于google开源框架Disruptor开发的一个异步流程处理框架,关于Disruptor的介绍请参考 async-framework提供了流程和队列的概念,流程 Flow 代表步骤,队列 Queue 代表处理节点,队列由Disruptor...
  • tiny_disruptor 简化的实践破坏者
  • disruptor-demo希望想使用disruptor的同学可以通过这些demo尽快上手
  • disruptor框架案例.rar

    2019-12-04 22:07:50
    Disruptor它是一个开源的并发框架能够在无锁的情况下实现网络的Queue并发操作。同时,Disruptor是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式的实现,或者...
  • 本篇文章主要介绍了spring与disruptor集成的简单示例,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 10,521
精华内容 4,208
关键字:

disruptor