精华内容
下载资源
问答
  • JAVA 并发框架 API 一览 一执行框程序 (Executor) 最常见的用法就是用 Executors 来构造相关的线程池用 CompletionService 来分离生产任务和已经完成的任务 生产者 submit 执行的任务使用者 take 已完成的任务并按照...
  • 原标题:Java并发编程 : Executor线程池框架一、Executor框架简介1、基础简介Executor系统中,将线程任务提交和任务执行进行了解耦的设计,Executor有各种功能强大的实现类,提供便捷方式来提交任务并且获取任务执行...

    原标题:Java并发编程 : Executor线程池框架

    一、Executor框架简介

    1、基础简介

    Executor系统中,将线程任务提交和任务执行进行了解耦的设计,Executor有各种功能强大的实现类,提供便捷方式来提交任务并且获取任务执行结果,封装了任务执行的过程,不再需要Thread().start()方式,显式创建线程并关联执行任务。

    2、调度模型

    线程被一对一映射为服务所在操作系统线程,启动时会创建一个操作系统线程;当该线程终止时,这个操作系统线程也会被回收。

    f297dc4a8745dc6065c880f9e16a86d0.png

    二、核心API结构

    Executor框架包含的核心接口和主要的实现类如下图所示:

    a46778597823d508c746ad8e6667dcc5.png

    线程池任务:核心接口:Runnable、Callable接口和接口实现类;

    任务的结果:接口Future和实现类FutureTask;

    任务的执行:核心接口Executor和ExecutorService接口。在Executor框架中有两个核心类实现了ExecutorService接口,ThreadPoolExecutor和ScheduledThreadPoolExecutor。

    1c77b79962e3a4266c71b50e995ac242.png

    流程分析

    线程池中线程数小于corePoolSize时,新任务将创建一个新线程执行任务,不论此时线程池中存在空闲线程;

    线程池中线程数达到corePoolSize时,新任务将被放入workQueue中,等待线程池中任务调度执行;

    当workQueue已满,且maximumPoolSize>corePoolSize时,新任务会创建新线程执行任务;

    当workQueue已满,且提交任务数超过maximumPoolSize,任务由RejectedExecutionHandler处理;

    当线程池中线程数超过corePoolSize,且超过这部分的空闲时间达到keepAliveTime时,回收该线程;

    如果设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize范围内的线程空闲时间达到keepAliveTime也将回收;

    三、线程池应用

    应用场景:批量账户和密码的校验任务,在实际的业务中算比较常见的,通过初始化线程池,把任务提交执行,最后拿到处理结果,这就是线程池使用的核心思想:节省资源提升效率。

    线程池主要用来解决线程生命周期开销问题和资源不足问题,通过线程池对多个任务线程重复使用,线程创建也被分摊到多个任务上,多数任务提交就有空闲的线程可以使用,所以消除线程频繁创建带来的开销。返回搜狐,查看更多

    责任编辑:

    展开全文
  • Disruptor是一个高性能的异步处理框架,一个轻量级的JMS,和JDK中的BlockingQueue有相似处,但是它的处理速度非常快,获得2011年程序框架创新大奖,号称“一个线程一秒钟可以处理600W个订单”(这有点吓人吧),并且...

    Disruptor是什么?
    Disruptor是一个高性能的异步处理框架,一个轻量级的JMS,和JDK中的BlockingQueue有相似处,但是它的处理速度非常快,获得2011年程序框架创新大奖,号称“一个线程一秒钟可以处理600W个订单”(这有点吓人吧),并且Disruptor不仅仅只有buffer,它提供的功能非常强大,比如它可以帮助我们轻松构建数据流处理(比如一个数据先交给A和B这2个消费者并行处理后再交给C处理,是不是有点想起storm这种流处理,实际上strom的底层就是应用了disruptor来实现worker内部threads的通信)。本文将使用disruptor最新版3.3.6进行介绍,可以在
    https://github.com/LMAX-Exchange/disruptor/releases 下载最新的JAR包开始disruptor之旅吧。

    轮胎:RingBuffer

    RingBuffer,环形缓冲区,在disruptor中扮演着非常重要的角色,理解RingBuffer的结构有利于我们理解disruptor为什么这么快、无锁的实现方式、生产者/消费者模式的实现细节。如下图所示:

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bV2qKzfA-1591960121369)(https://upload-images.jianshu.io/upload_images/22478635-fa10d4054e44f757?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

    数组

    这个类似于轮胎的东西实际上就是一个数组,使用数组的好处当然是由于预加载的缘故使得访问比链表要快的多。

    序号

    RingBuffer中元素拥有序号的概念,并且序号是一直增长的,这怎么理解?比如RingBuffer大小为10,那么序号从0开始增长,当到9的时候,相当于转了一圈,如果继续增长的话,那么将覆盖0号元素。也即是说通过 序号%SIZE 来定位元素,实现set/get操作。这里也发现了和队列不同的一个方式,就是不存在元素的删除操作,只有覆盖而已,实际上RingBuffer的这种特别的环形存储方式,使得不需要花费大量的时间用于内存清理/垃圾回收。

    由于涉及到取模操作,为了CPU进行位运算更加高效,RingBuffer的大小应该是2的N次方。

    无锁的机制

    在生产者/消费者模式下,disruptor号称“无锁并行框架”(要知道BlockingQueue是利用了Lock锁机制来实现的),这是怎么做到的呢?下面我们来具体分析下:

    一个生产者 + 一个消费者

    生产者维护一个生产指针P,消费者维护一个消费者指针C,当然P和C本质上就是序号。2者各操作各的,不需要锁,仅仅需要注意的是生产者和消费者的速度问题,当然这个在disruptor内部已经为我们做了处理,就是判断一下P和C之间不能超过一圈的大小。

    一个生产者 + 多个消费者

    多个消费者当然持有多个消费指针C1,C2,…,消费者依据C进行各自读取数据,只需要保证生产者的速度“协调”最慢的消费者的速度,就是那个不能超出一圈的概念。此时也不需要进行锁定。

    多个生产者 + N个消费者

    很显然,无论生产者有几个,生产者指针P只能存在一个,否则数据就乱套了。那么多个生产者之间共享一个P指针,在disruptor中实际上是利用了CAS机制来保证多线程的数据安全,也没有使用到锁。

    Disruptor初体验:简单的生产者和消费者

    业务数据对象POJO(Event)

    public class Order {
        /* 订单ID */
        private long id;

        /* 订单信息 */
        private String info;

        /* 订单价格 */
        private double price;

        public long getId()
        {
            return(id);
        }


        public void setId( long id )
        {
            this.id = id;
        }


        public String getInfo()
        {
            return(info);
        }


        public void setInfo( String info )
        {
            this.info = info;
        }


        public double getPrice()
        {
            return(price);
        }


        public void setPrice( double price )
        {
            this.price = price;
        }
    }

    业务数据工厂(Factory)

    public class OrderFactory implements EventFactory {
        @Override
        public Object newInstance()
        {
            System.out.println( "OrderFactory.newInstance" );
            return(new Order() );
        }
    }

    事件处理器(Handler,即消费者处理逻辑)

    public class OrderHandler implements EventHandler<Order>{
        @Override
        public void onEvent( Order order, long l, boolean b ) throws Exception
        {
            System.out.println( Thread.currentThread().getName() + " 消费者处理中:" + l );
            order.setInfo( "info" + order.getId() );
            order.setPrice( Math.random() );
        }
    }

    Main

    public class Main {
        public static void main( String[] args ) throws InterruptedException
        {
            /* 创建订单工厂 */
            OrderFactory orderFactory = new OrderFactory();

            /* ringbuffer的大小 */
            int RINGBUFFER_SIZE = 1024;

            /* 创建disruptor */
            Disruptor<Order> disruptor = new Disruptor<Order>( orderFactory, RINGBUFFER_SIZE, Executors.defaultThreadFactory() );

            /* 设置事件处理器 即消费者 */
            disruptor.handleEventsWith( new OrderHandler() );

            disruptor.start();

            RingBuffer<Order> ringBuffer = disruptor.getRingBuffer();

            /* -------------生产数据 */
            for ( int i = 0; i < 3; i++ )
            {
                long sequence = ringBuffer.next();

                Order order = ringBuffer.get( sequence );

                order.setId( i );

                ringBuffer.publish( sequence );
                System.out.println( Thread.currentThread().getName() + " 生产者发布一条数据:" + sequence + " 订单ID:" + i );
            }

            Thread.sleep( 1000 );

            disruptor.shutdown();
        }
    }

    运行结果:

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-V25Xei7s-1591960121372)(https://upload-images.jianshu.io/upload_images/22478635-fc42aec1fcc42b38?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

    说明:

    其实上面的结果已经很明显的说明了,在初始阶段构造Disruptor的时候,会调用工厂Factory去实例化RingBuffer中的Event数据对象。

    另外在构造Disruptor的时候,在3.3.6之前使用的是API:

    到了3.3.6这些API都不推荐使用了,即不再推荐传入Executor这样的线程池,而是推荐传入ThreadFactory线程工厂。这样的话,关闭disruptor就会自动关闭Executor线程池,而不需要像以前那样必须在关闭disruptor的时候再关闭线程池了。

    构造Disruptor时,需要注意ProducerType(SINGLE or MULTI 指示是单个生产者还是多个生产者模式)、WaitStrategy(策略选择,决定了消费者如何等待生产者)。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xosRRHuN-1591960121378)(https://upload-images.jianshu.io/upload_images/22478635-2569d715193c3998?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

    单独使用RingBuffer:WorkerPool

    如果场景比较简单,我们完全可以不用创建Disruptor,而是仅仅使用RingBuffer功能。

    public static void main( String[] args ) throws InterruptedException
    {
        ExecutorService        executor    = Executors.newFixedThreadPool( 3 );
        RingBuffer<Order>    ringBuffer    = RingBuffer.create( ProducerType.SINGLE, new OrderFactory(), 1024, new YieldingWaitStrategy() );
        WorkerPool<Order>    workerPool    = new WorkerPool<Order>( ringBuffer, ringBuffer.newBarrier(), new IgnoreExceptionHandler(), new OrderHandler() );

        workerPool.start( executor );

        /* -------------生产数据 */
        for ( int i = 0; i < 30; i++ )
        {
            long sequence = ringBuffer.next();

            Order order = ringBuffer.get( sequence );
            order.setId( i );

            ringBuffer.publish( sequence );

            System.out.println( Thread.currentThread().getName() + " 生产者发布一条数据:" + sequence + " 订单ID:" + i );
        }

        Thread.sleep( 1000 );

        workerPool.halt();
        executor.shutdown();
    }

    实际上是利用WorkerPool辅助连接消费者。

    一个生产者+多个消费者

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wboXIggd-1591960121380)(https://upload-images.jianshu.io/upload_images/22478635-764f278a5cf381cb?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

    public static void main( String[] args ) throws InterruptedException
    {
        /* 创建订单工厂 */
        OrderFactory orderFactory = new OrderFactory();

        /* ringbuffer的大小 */
        int RINGBUFFER_SIZE = 1024;

        /* 创建disruptor */
        Disruptor<Order> disruptor = new Disruptor<Order>( orderFactory, RINGBUFFER_SIZE, Executors.defaultThreadFactory() );

        /* 设置事件处理器 即消费者 */
        EventHandlerGroup<Order> eventHandlerGroup = disruptor.handleEventsWith( new OrderHandler(), new OrderHandler2() );
        eventHandlerGroup.then( new OrderHandler3() );
        disruptor.start();

        RingBuffer<Order> ringBuffer = disruptor.getRingBuffer();

        /* -------------生产数据 */
        for ( int i = 0; i < 3; i++ )
        {
            long sequence = ringBuffer.next();

            Order order = ringBuffer.get( sequence );

            order.setId( i );

            ringBuffer.publish( sequence );
            System.out.println( Thread.currentThread().getName() + " 生产者发布一条数据:" + sequence + " 订单ID:" + i );
        }

        Thread.sleep( 1000 );
        disruptor.shutdown();
    }

    运行结果:

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lbaEDZcZ-1591960121382)(https://upload-images.jianshu.io/upload_images/22478635-00beb3bb71654ffa?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

    生产者生产了3条消息,一个消费者线程1消费了这3条数据,另一个消费者线程2也消费了这3条数据,2者是并行的,待消费者线程1和2完毕后,3条数据交给消费者线程3处理。

    如果我们想顺序的按照A->B->C呢?

        public class Order {
        /* 订单ID */
        private long id;

        /* 订单信息 */
        private String info;

        /* 订单价格 */
        private double price;

        public long getId()
        {
            return(id);
        }


        public void setId( long id )
        {
            this.id = id;
        }


        public String getInfo()
        {
            return(info);
        }


        public void setInfo( String info )
        {
            this.info = info;
        }


        public double getPrice()
        {
            return(price);
        }


        public void setPrice( double price )
        {
            this.price = price;
        }
    }

    如果我们想六边形操作呢?

         Handler1    h1    = new Handler1();
    Handler2    h2    = new Handler2();
    Handler3    h3    = new Handler3();
    Handler4    h4    = new Handler4();
    Handler5    h5    = new Handler5();
    disruptor.handleEventsWith( h1, h2 );
    disruptor.after( h1 ).handleEventsWith( h4 );
    disruptor.after( h2 ).handleEventsWith( h5 );
    disruptor.after( h4, h5 ).handleEventsWith( h3 );

    到这里相信你对Disruptor已经有所了解了,那么多个生产者多个消费者如何实现呢,其实和上面的代码非常类似,无非是多个生产者都持有RingBuffer可以publish而已。
     

    展开全文
  • Disruptor是一个高性能的异步处理框架,一个轻量级的JMS,和JDK中的BlockingQueue有相似处,但是它的处理速度非常快,获得2011年程序框架创新大奖,号称“一个线程一秒钟可以处理600W个订单”(这有点吓人吧),并且...

    Disruptor是什么?

    Disruptor是一个高性能的异步处理框架,一个轻量级的JMS,和JDK中的BlockingQueue有相似处,但是它的处理速度非常快,获得2011年程序框架创新大奖,号称“一个线程一秒钟可以处理600W个订单”(这有点吓人吧),并且Disruptor不仅仅只有buffer,它提供的功能非常强大,比如它可以帮助我们轻松构建数据流处理(比如一个数据先交给A和B这2个消费者并行处理后再交给C处理,是不是有点想起storm这种流处理,实际上strom的底层就是应用了disruptor来实现worker内部threads的通信)。本文将使用disruptor最新版3.3.6进行介绍,可以在

    https://github.com/LMAX-Exchange/disruptor/releases 下载最新的JAR包开始disruptor之旅吧。

    轮胎:RingBuffer

    RingBuffer,环形缓冲区,在disruptor中扮演着非常重要的角色,理解RingBuffer的结构有利于我们理解disruptor为什么这么快、无锁的实现方式、生产者/消费者模式的实现细节。如下图所示:

    数组

    这个类似于轮胎的东西实际上就是一个数组,使用数组的好处当然是由于预加载的缘故使得访问比链表要快的多。

    序号

    RingBuffer中元素拥有序号的概念,并且序号是一直增长的,这怎么理解?比如RingBuffer大小为10,那么序号从0开始增长,当到9的时候,相当于转了一圈,如果继续增长的话,那么将覆盖0号元素。也即是说通过 序号%SIZE 来定位元素,实现set/get操作。这里也发现了和队列不同的一个方式,就是不存在元素的删除操作,只有覆盖而已,实际上RingBuffer的这种特别的环形存储方式,使得不需要花费大量的时间用于内存清理/垃圾回收。

    由于涉及到取模操作,为了CPU进行位运算更加高效,RingBuffer的大小应该是2的N次方。

    无锁的机制

    在生产者/消费者模式下,disruptor号称“无锁并行框架”(要知道BlockingQueue是利用了Lock锁机制来实现的),这是怎么做到的呢?下面我们来具体分析下:

    一个生产者 + 一个消费者

    生产者维护一个生产指针P,消费者维护一个消费者指针C,当然P和C本质上就是序号。2者各操作各的,不需要锁,仅仅需要注意的是生产者和消费者的速度问题,当然这个在disruptor内部已经为我们做了处理,就是判断一下P和C之间不能超过一圈的大小。

    一个生产者 + 多个消费者

    多个消费者当然持有多个消费指针C1,C2,...,消费者依据C进行各自读取数据,只需要保证生产者的速度“协调”最慢的消费者的速度,就是那个不能超出一圈的概念。此时也不需要进行锁定。

    多个生产者 + N个消费者

    很显然,无论生产者有几个,生产者指针P只能存在一个,否则数据就乱套了。那么多个生产者之间共享一个P指针,在disruptor中实际上是利用了CAS机制来保证多线程的数据安全,也没有使用到锁。

    Disruptor初体验:简单的生产者和消费者

    业务数据对象POJO(Event)

    public class Order {

    //订单ID

    private long id;

    //订单信息

    private String info;

    //订单价格

    private double price;

    public long getId() {

    return id;

    }

    public void setId(long id) {

    this.id = id;

    }

    public String getInfo() {

    return info;

    }

    public void setInfo(String info) {

    this.info = info;

    }

    public double getPrice() {

    return price;

    }

    public void setPrice(double price) {

    this.price = price;

    }

    }

    业务数据工厂(Factory)

    public class OrderFactory implements EventFactory{

    @Override

    public Object newInstance() {

    System.out.println("OrderFactory.newInstance");

    return new Order();

    }

    }

    事件处理器(Handler,即消费者处理逻辑)

    public class OrderHandler implements EventHandler{

    @Override

    public void onEvent(Order order, long l, boolean b) throws Exception {

    System.out.println(Thread.currentThread().getName() + " 消费者处理中:" + l);

    order.setInfo("info" + order.getId());

    order.setPrice(Math.random());

    }

    }

    Main

    public class Main {

    public static void main(String[] args) throws InterruptedException {

    //创建订单工厂

    OrderFactory orderFactory = new OrderFactory();

    //ringbuffer的大小

    int RINGBUFFER_SIZE = 1024;

    //创建disruptor

    Disruptor disruptor = new Disruptor(orderFactory,RINGBUFFER_SIZE,Executors.defaultThreadFactory());

    //设置事件处理器 即消费者

    disruptor.handleEventsWith(new OrderHandler());

    disruptor.start();

    RingBuffer ringBuffer = disruptor.getRingBuffer();

    //-------------生产数据

    for(int i = 0 ; i < 3 ; i++){

    long sequence = ringBuffer.next();

    Order order = ringBuffer.get(sequence);

    order.setId(i);

    ringBuffer.publish(sequence);

    System.out.println(Thread.currentThread().getName() + " 生产者发布一条数据:" + sequence + " 订单ID:" + i);

    }

    Thread.sleep(1000);

    disruptor.shutdown();

    }

    }

    运行结果:

    说明:

    其实上面的结果已经很明显的说明了,在初始阶段构造Disruptor的时候,会调用工厂Factory去实例化RingBuffer中的Event数据对象。

    另外在构造Disruptor的时候,在3.3.6之前使用的是API:

    到了3.3.6这些API都不推荐使用了,即不再推荐传入Executor这样的线程池,而是推荐传入ThreadFactory线程工厂。这样的话,关闭disruptor就会自动关闭Executor线程池,而不需要像以前那样必须在关闭disruptor的时候再关闭线程池了。

    构造Disruptor时,需要注意ProducerType(SINGLE or MULTI 指示是单个生产者还是多个生产者模式)、WaitStrategy(策略选择,决定了消费者如何等待生产者)。

    单独使用RingBuffer:WorkerPool

    如果场景比较简单,我们完全可以不用创建Disruptor,而是仅仅使用RingBuffer功能。

    public static void main(String[] args) throws InterruptedException {

    ExecutorService executor = Executors.newFixedThreadPool(3);

    RingBuffer ringBuffer = RingBuffer.create(ProducerType.SINGLE,new OrderFactory(),1024,new YieldingWaitStrategy());

    WorkerPool workerPool = new WorkerPool(ringBuffer,ringBuffer.newBarrier(),new IgnoreExceptionHandler(),new OrderHandler());

    workerPool.start(executor);

    //-------------生产数据

    for(int i = 0 ; i < 30 ; i++){

    long sequence = ringBuffer.next();

    Order order = ringBuffer.get(sequence);

    order.setId(i);

    ringBuffer.publish(sequence);

    System.out.println(Thread.currentThread().getName() + " 生产者发布一条数据:" + sequence + " 订单ID:" + i);

    }

    Thread.sleep(1000);

    workerPool.halt();

    executor.shutdown();

    }

    实际上是利用WorkerPool辅助连接消费者。

    一个生产者+多个消费者

    public static void main(String[] args) throws InterruptedException {

    //创建订单工厂

    OrderFactory orderFactory = new OrderFactory();

    //ringbuffer的大小

    int RINGBUFFER_SIZE = 1024;

    //创建disruptor

    Disruptor disruptor = new Disruptor(orderFactory,RINGBUFFER_SIZE,Executors.defaultThreadFactory());

    //设置事件处理器 即消费者

    EventHandlerGroup eventHandlerGroup = disruptor.handleEventsWith(new OrderHandler(),new OrderHandler2());

    eventHandlerGroup.then(new OrderHandler3());

    disruptor.start();

    RingBuffer ringBuffer = disruptor.getRingBuffer();

    //-------------生产数据

    for(int i = 0 ; i < 3 ; i++){

    long sequence = ringBuffer.next();

    Order order = ringBuffer.get(sequence);

    order.setId(i);

    ringBuffer.publish(sequence);

    System.out.println(Thread.currentThread().getName() + " 生产者发布一条数据:" + sequence + " 订单ID:" + i);

    }

    Thread.sleep(1000);

    disruptor.shutdown();

    }

    运行结果:

    生产者生产了3条消息,一个消费者线程1消费了这3条数据,另一个消费者线程2也消费了这3条数据,2者是并行的,待消费者线程1和2完毕后,3条数据交给消费者线程3处理。

    如果我们想顺序的按照A->B->C呢?

    disruptor.handleEventsWith(new Handler1()).

    handleEventsWith(new Handler2()).

    handleEventsWith(new Handler3());

    如果我们想六边形操作呢?

    Handler1 h1 = new Handler1();

    Handler2 h2 = new Handler2();

    Handler3 h3 = new Handler3();

    Handler4 h4 = new Handler4();

    Handler5 h5 = new Handler5();

    disruptor.handleEventsWith(h1, h2);

    disruptor.after(h1).handleEventsWith(h4);

    disruptor.after(h2).handleEventsWith(h5);

    disruptor.after(h4, h5).handleEventsWith(h3);

    到这里相信你对Disruptor已经有所了解了,那么多个生产者多个消费者如何实现呢,其实和上面的代码非常类似,无非是多个生产者都持有RingBuffer可以publish而已。

    展开全文
  • Disruptor是一个高性能的异步处理框架,一个轻量级的JMS,和JDK中的BlockingQueue有相似处,但是它的处理速度非常快,获得2011年程序框架创新大奖,号称“一个线程一秒钟可以处理600W个订单”(这有点吓人吧),并且...

    Disruptor是什么?

    Disruptor是一个高性能的异步处理框架,一个轻量级的JMS,和JDK中的BlockingQueue有相似处,但是它的处理速度非常快,获得2011年程序框架创新大奖,号称“一个线程一秒钟可以处理600W个订单”(这有点吓人吧),并且Disruptor不仅仅只有buffer,它提供的功能非常强大,比如它可以帮助我们轻松构建数据流处理(比如一个数据先交给A和B这2个消费者并行处理后在交给C处理,是不是有点想起storm这种流处理,实际上strom的底层就是应用了disruptor来实现worker内部threads的通信)。本文将使用disruptor最新版3.3.6进行介绍,可以在https://github.com/LMAX-Exchange/disruptor/releases下载最新的JAR包开始disruptor之旅吧。

    轮胎:RingBuffer

    RingBuffer,环形缓冲区,在disruptor中扮演着非常重要的角色,理解RingBuffer的结构有利于我们理解disruptor为什么这么快、无锁的实现方式、生产者/消费者模式的实现细节。如下图所示:

    270b4eb11f7cb766513107643589a0cd.png

    数组

    这个类似于轮胎的东西实际上就是一个数组,使用数组的好处当然是由于预加载的缘故使得访问比链表要快的多。

    序号

    RingBuffer中元素拥有序号的概念,并且序号是一直增长的,这怎么理解?比如RingBuffer大小为10,那么序号从0开始增长,当到9的时候,相当于转了一圈,如果继续增长的话,那么将覆盖0号元素。也即是说通过 序号%SIZE 来定位元素,实现set/get操作。这里也发现了和队列不同的一个方式,就是不存在元素的删除操作,只有覆盖而已,实际上RingBuffer的这种特别的环形存储方式,使得不需要花费大量的时间用于内存清理/垃圾回收。

    由于涉及到取模操作,为了CPU进行位运算更加高效,RingBuffer的大小应该是2的N次方。

    无锁的机制

    在生产者/消费者模式下,disruptor号称“无锁并行框架”(要知道BlockingQueue是利用了Lock锁机制来实现的),这是怎么做到的呢?下面我们来具体分析下:

    一个生产者 + 一个消费者

    生产者维护一个生产指针P,消费者维护一个消费者指针C,当然P和C本质上就是序号。2者各操作各的,不需要锁,仅仅需要注意的是生产者和消费者的速度问题,当然这个在disruptor内部已经为我们做了处理,就是判断一下P和C之间不能超过一圈的大小。

    一个生产者 + 多个消费者

    多个消费者当然持有多个消费指针C1,C2,...,消费者依据C进行各自读取数据,只需要保证生产者的速度“协调”最慢的消费者的速度,就是那个不能超出一圈的概念。此时也不需要进行锁定。

    多个生产者 + N个消费者

    很显然,无论生产者有几个,生产者指针P只能存在一个,否则数据就乱套了。那么多个生产者之间共享一个P指针,在disruptor中实际上是利用了CAS机制来保证多线程的数据安全,也没有使用到锁。

    Disruptor初体验:简单的生产者和消费者

    业务数据对象POJO(Event)public class Order {

    //订单ID

    private long id;

    //订单信息

    private String info;

    //订单价格

    private double price;

    public long getId() {

    return id;

    }

    public void setId(long id) {

    this.id = id;

    }

    public String getInfo() {

    return info;

    }

    public void setInfo(String info) {

    this.info = info;

    }

    public double getPrice() {

    return price;

    }

    public void setPrice(double price) {

    this.price = price;

    }

    }

    业务数据工厂(Factory)public class OrderFactory implements EventFactory{

    @Override

    public Object newInstance() {

    System.out.println("OrderFactory.newInstance");

    return new Order();

    }

    }

    事件处理器(Handler,即消费者处理逻辑)public class OrderHandler implements EventHandler{

    @Override

    public void onEvent(Order order, long l, boolean b) throws Exception {

    System.out.println(Thread.currentThread().getName() + " 消费者处理中:" + l);

    order.setInfo("info" + order.getId());

    order.setPrice(Math.random());

    }

    }

    Mainpublic class Main {

    public static void main(String[] args) throws InterruptedException {

    //创建订单工厂

    OrderFactory orderFactory = new OrderFactory();

    //ringbuffer的大小

    int RINGBUFFER_SIZE = 1024;

    //创建disruptor

    Disruptor disruptor = new Disruptor(orderFactory,RINGBUFFER_SIZE,Executors.defaultThreadFactory());

    //设置事件处理器 即消费者

    disruptor.handleEventsWith(new OrderHandler());

    disruptor.start();

    RingBuffer ringBuffer = disruptor.getRingBuffer();

    //-------------生产数据

    for(int i = 0 ; i 

    long sequence = ringBuffer.next();

    Order order = ringBuffer.get(sequence);

    order.setId(i);

    ringBuffer.publish(sequence);

    System.out.println(Thread.currentThread().getName() + " 生产者发布一条数据:" + sequence + " 订单ID:" + i);

    }

    Thread.sleep(1000);

    disruptor.shutdown();

    }

    }

    运行结果:

    fd8f10df9d4664820d3a19e1110542fd.png

    说明:

    其实上面的结果已经很明显的说明了,在初始阶段构造Disruptor的时候,会调用工厂Factory去实例化RingBuffer中的Event数据对象。

    另外在构造Disruptor的时候,在3.3.6之前使用的是API:

    cfe1454486406ab66c659ed5ea03e602.png

    到了3.3.6这些API都不推荐使用了,即不再推荐传入Executor这样的线程池,而是推荐传入ThreadFactory线程工厂。这样的话,关闭disruptor就会自动关闭Executor线程池,而不需要像以前那样必须在关闭disruptor的时候再关闭线程池了。

    构造Disruptor时,需要注意ProducerType(SINGLE or MULTI 指示是单个生产者还是多个生产者模式)、WaitStrategy(策略选择,决定了消费者如何等待生产者)。

    c5b476cf0c9df2bc78587bfeb602e6b3.png

    单独使用RingBuffer:WorkerPool

    如果场景比较简单,我们完全可以不用创建Disruptor,而是仅仅使用RingBuffer功能。public static void main(String[] args) throws InterruptedException {

    ExecutorService executor = Executors.newFixedThreadPool(3);

    RingBuffer ringBuffer = RingBuffer.create(ProducerType.SINGLE,new OrderFactory(),1024,new YieldingWaitStrategy());

    WorkerPool workerPool = new WorkerPool(ringBuffer,ringBuffer.newBarrier(),new IgnoreExceptionHandler(),new OrderHandler());

    workerPool.start(executor);

    //-------------生产数据

    for(int i = 0 ; i 

    long sequence = ringBuffer.next();

    Order order = ringBuffer.get(sequence);

    order.setId(i);

    ringBuffer.publish(sequence);

    System.out.println(Thread.currentThread().getName() + " 生产者发布一条数据:" + sequence + " 订单ID:" + i);

    }

    Thread.sleep(1000);

    workerPool.halt();

    executor.shutdown();

    }

    实际上是利用WorkerPool辅助连接消费者。

    一个生产者+多个消费者

    476730cf6f0bb3e4fdb539b8be7d7464.pngpublic static void main(String[] args) throws InterruptedException {

    //创建订单工厂

    OrderFactory orderFactory = new OrderFactory();

    //ringbuffer的大小

    int RINGBUFFER_SIZE = 1024;

    //创建disruptor

    Disruptor disruptor = new Disruptor(orderFactory,RINGBUFFER_SIZE,Executors.defaultThreadFactory());

    //设置事件处理器 即消费者

    EventHandlerGroup eventHandlerGroup = disruptor.handleEventsWith(new OrderHandler(),new OrderHandler2());

    eventHandlerGroup.then(new OrderHandler3());

    disruptor.start();

    RingBuffer ringBuffer = disruptor.getRingBuffer();

    //-------------生产数据

    for(int i = 0 ; i 

    long sequence = ringBuffer.next();

    Order order = ringBuffer.get(sequence);

    order.setId(i);

    ringBuffer.publish(sequence);

    System.out.println(Thread.currentThread().getName() + " 生产者发布一条数据:" + sequence + " 订单ID:" + i);

    }

    Thread.sleep(1000);

    disruptor.shutdown();

    }

    运行结果:

    103d91f9c12c7033a888056fb304a190.png

    生产者生产了3条消息,一个消费者线程1消费了这3条数据,另一个消费者线程2也消费了这3条数据,2者是并行的,待消费者线程1和2完毕后,3条数据交给消费者线程3处理。

    如果我们想顺序的按照A->B->C呢?disruptor.handleEventsWith(new Handler1()).

    handleEventsWith(new Handler2()).

    handleEventsWith(new Handler3());

    如果我们想六边形操作呢?

    14461d4e75ff377226b0f1825c605181.pngHandler1 h1 = new Handler1();

    Handler2 h2 = new Handler2();

    Handler3 h3 = new Handler3();

    Handler4 h4 = new Handler4();

    Handler5 h5 = new Handler5();

    disruptor.handleEventsWith(h1, h2);

    disruptor.after(h1).handleEventsWith(h4);

    disruptor.after(h2).handleEventsWith(h5);

    disruptor.after(h4, h5).handleEventsWith(h3);

    到这里相信你对Disruptor已经有所了解了,那么多个生产者多个消费者如何实现呢,其实和上面的代码非常类似,无非是多个生产者都持有RingBuffer可以publish而已。

    展开全文
  • 转自:https://blog.csdn.net/MonkeyDCoding/article/details/813696100.源代码github-简易高并发框架注:本篇博客知识来自于网课。1.问题来源以及w对于一个题库系统。考试组要有批量的离线文档要生成。题库组批量的...
  • Java 并发编程框架(一)在Java1.5之前,编写多线程并非易事,那么编写多线程为啥不想想象的那么简单,为什么需要线程池?先来回答这个问题。Why Thread Pool在Java中,如果每当一个请求到达就创建一个新线程,开销...
  • Disruptor是一个高性能的异步处理框架,一个轻量级的JMS,和JDK中的BlockingQueue有相似处,但是它的处理速度非常快,获得2011年程序框架创新大奖,号称“一个线程一秒钟可以处理600W个订单”(这有点吓人吧),并且...
  • Java并发编程高阶技术 高性能并发框架源码解析与实战全网唯一深度解析并发编程框架disruptor底层源码课程,助你成为并发编程高手,拿下高薪。高性能Java并发框架disruptor源码解析与实战什么是Disruptor?它一个高...
  • 第1章 课程介绍(Java并发编程进阶课程)什么是Disruptor?它一个高性能的异步处理框架,号称“单线程每秒可处理600W个订单”的神器,本课程目标:彻底精通一个如此优秀的开源框架,面试秒杀面试官。本章会带领小伙伴...
  • 文章目录什么是 Fork/Join 框架工作窃取算法Fork/Join 框架的设计使用 Fork/Join 框架Fork/Join 框架的异常处理Fork/Join 框架的实现原理参考 什么是 Fork/Join 框架 ...《Java 并发编程的艺术》 ...
  • Thrift框架服务端并发处理模式的java示例2015-10-16 11:05:07作者:MangoCool来源:MangoCool项目因为需要对外提供不同语言的接口,所以我们采用了高效、跨语言的RPC框架Thrift。因为用的爽!顺理成章继续沿用,但是...
  • 在JDK中提供了丰富的集合框架工具,这些工具可以有效地对数据进行处理
  • 鸟瞰 Java 并发框架

    2019-08-31 14:41:06
    分析并发框架的示例用例 3. 快速更新线程配置 4. 性能测试结果 5. 使用执行器服务并行化 IO 任务 6. 使用执行器服务并行化 IO 任务(CompletableFuture) 7. 使用 ExecutorService 并行处理...
  • 第1章 课程介绍(Java并发编程进阶课程)什么是Disruptor?它一个高性能的异步处理框架,号称“单线程每秒可处理600W个订单”的神器,本课程目标:彻底精通一个如此优秀的开源框架,面试秒杀面试官。本章会带领小伙伴...
  • Java并发编程 Executor框架 1. 在线程中执行任务 在理想情况下,各个任务之间是相互独立的:任务并不依赖于其他任务的状态、结果或者边界效应,独立性有助于实现并发,因为如果存在足够多的处理资源,那么这些独立的...
  • JAVA并发编程系列】并发框架 -- Executor 框架(线程池) 【1】合理使用线程池的优势 1. 降低资源消耗; 2. 提高响应速度; 3. 提高线程的可管理性; 【2】线程池的实现原理 当提交一个新任务到线程池时,...
  • 并发:Lock 框架详解 摘要:  我们已经知道,synchronized 是java的关键字,是Java的内置特性,在JVM层面实现了对临界资源的同步互斥访问,但 synchronized 粒度有些大,在处理实际问题时存在诸多局限性,...
  • 并发容器框架1.多线程中的容器  上图是Java中的集合框架结构,当我们在单线程中使用集合框架中,一切都是顺理成章的。但到了多线程环境,由于基础容器就没有做同步处理,可能会出现很多线程安全的问题。最常见的...
  • 译者:罗立树假如你生活在另外一个星球,我们最近开源了一套高性能的基于消息传递的开源框架。下面我给大家介绍一下如何将消息通过Ring buffer在无锁的情况下进行处理。在深入介绍之前,可以先快速阅读一下Trish发表...
  • 1.什么是阻塞队列 阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是 从队列里取元素的线程。阻塞队列就是生产者用来存放元素、...在阻塞队列不可用时,这两个附加操作提供了4种处理方式
  • akka是一系列框架,包括akka-actor, akka-remote, akka-cluster, akka-stream等,分别具有高并发处理模型——actor模型,远程通信,集群管理,流处理等功能。 akka支持scala和java等JVM编程语言。 akka actor akka ...
  • 它一个高性能的异步处理框架,号称“单线程每秒可处理600W个订单”的神器,本课程目标:彻底精通一个如此优秀的开源框架,面试秒杀面试官。本章会带领小伙伴们先了解课程大纲与重点,然后模拟千万,亿级数据进行压力...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,275
精华内容 510
关键字:

java并发处理框架

java 订阅