精华内容
下载资源
问答
  • CyclicBarrier

    2021-03-06 01:07:12
    CyclicBarrierDemo.java package thread; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierDemo { ... CyclicBarrier cyclicBarrier=new CyclicBarrier(7,()->{ System.out.pr..



    CyclicBarrierDemo.java

    package thread;
    
    import java.util.concurrent.CyclicBarrier;
    
    public class CyclicBarrierDemo {
        public static void main(String[] args) {
            CyclicBarrier cyclicBarrier=new CyclicBarrier(7,()->{
                System.out.println("*****召唤神龙");
            });
            for (int i = 1; i <=7 ; i++) {
                final int tempInt=i;
                new Thread(()->{
                    System.out.println(Thread.currentThread().getName()+
                            "\t 收集到第"+tempInt+"颗龙珠");
                    try{
                        cyclicBarrier.await();
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                },String.valueOf(i)).start();
            }
        }
    }
    

    展开全文
  • CyclicBarrier

    2018-12-18 16:04:40
    1CyclicBarrier CyclicBarrier是让一组线程达到一个屏障(也叫做同步点),当这一组线程执行到达这个屏障(cyclicBarrier.await()代码处)时,这组线程才会继续往下执行。 CyclicBarrier比较适用于多线程计算的场景...

    1 CyclicBarrier

    CyclicBarrier是让一组线程达到一个屏障(也叫做同步点),当这一组线程执行到达这个屏障(cyclicBarrier.await()代码处)时,这组线程才会继续往下执行。

    CyclicBarrier比较适用于多线程计算的场景,当这些线程都执行到某一个预设地点以后,再执行另外的操作。例如,开多个线程批量处理数据,多所有数据都处理完成后再进行汇总分析的场景。

    CyclicBarrier和之前介绍的CountDownLatch比较类似,他们的主要区别是:CountDownLatch计数器只能使用一次,而CyclicBarrier计数器可以通过reset方法重置。CyclicBarrier还提供了一些查看阻塞线程数量(getNumberWaiting),判断线程是否被阻塞(isBroken)的方法。

     

     

    2  源码分析

    1)  构造函数

    函数parties参数指的是,等待多少个线程进入屏障点,即等待多少线程调用await方法,才算所有线程都达到屏障点。

    函数barrierAction指的是当所有线程达到这个屏障以后,将执行此barrierAction的内容。

    public CyclicBarrier(int parties) {
        this(parties, null);
    }

     

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

     

    2)  await

    有两个版本,都是挂起当前线程,直到所有线程都达到屏障点时再继续执行。第二个版本支持让线程等待一定的时间,如果到时间以后还有线程没有达到屏障点,那么让已经达到屏障点的线程继续执行。

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

     

    3)  dowait

    屏障的核心逻辑都是在此方法中实现的。从以下代码中我们可以看到,他的主要处理逻辑(异常或者其他非主要逻辑忽略)如下:

     - 首先检查是否所有线程都达到屏障状态了,如果是,那么执行构造函数第二个参数barrierAction执行的任务。

     - 如果不是所有线程都达到屏障状态,那么当前线程挂起。

     - 如果线程挂起时指定了挂起时间,那么当时间到以后,此线程被唤醒,接着唤醒此时已经达到屏障状态的线程。

    private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;
            if (g.broken)
                throw new BrokenBarrierException();
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            int index = --count;
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

     

     

    3 示例

    以下示例代码中,我们制定屏障数是3,所以当三个线程调用cyclicBarrier.await()时,所有挂起的线程将被唤醒,然后执行构造函数中指定的任务,当然被唤醒的线程也将继续执行。

    public static void main(String[] args) {
        int num = 3;
        CyclicBarrier cyclicBarrier = new CyclicBarrier(num, ()->{
            System.out.println("
    时间="+DateUtil.getCurrentTime()+" -回调线程- 工作线程到达屏障点后开始执行");
        });
        for (int i = 0; i < num; i++) {
            String name = "T-"+i;
            Thread thread = new Thread(() -> {
                try {
                    doTask();
                    System.out.println("时间="+DateUtil.getCurrentTime()+" -工作线程- thread="+name+" 完成任务,进入屏障点,等待其他线程");
                    cyclicBarrier.await();
                    System.out.println("时间="+DateUtil.getCurrentTime()+" -工作线程- thread="+name+" 所有线程已进入屏障点,继续执行");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
            thread.start();
        }
    }
    private static void doTask(){
        Random random = new Random();
        long time = random.nextInt(3000) ;
        try {
            Thread.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

     

    运行结果如下,符合上面的分析。

    时间=11:36:52:893 -工作线程- thread=T-0 完成任务,进入屏障点,等待其他线程

    时间=11:36:53:174 -工作线程- thread=T-1 完成任务,进入屏障点,等待其他线程

    时间=11:36:53:434 -工作线程- thread=T-2 完成任务,进入屏障点,等待其他线程

    时间=11:36:53:434 -回调线程- 工作线程到达屏障点后开始执行

    时间=11:36:53:434 -工作线程- thread=T-0 所有线程已进入屏障点,继续执行

    时间=11:36:53:434 -工作线程- thread=T-1 所有线程已进入屏障点,继续执行

    时间=11:36:53:434 -工作线程- thread=T-2 所有线程已进入屏障点,继续执行


     

     

     

    展开全文

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 7,346
精华内容 2,938
关键字:

cyclicbarrier