精华内容
下载资源
问答
  • CyclicBarrier

    2021-12-10 20:48:28
    CyclicBarrier 字面意思回环栅栏,通过它可以实现让一组线程进行阻塞 当满足条件之后让这一组线程一起执行 CyclicBarrier的使用 构造方法 // parties表示屏障拦截的线程数量,每个线程调用 await 方法告诉 ...

    CyclicBarrier

    字面意思回环栅栏,通过它可以实现让一组线程进行阻塞 当满足条件之后让这一组线程一起执行

    CyclicBarrier的使用
    构造方法
    // parties表示屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。
    public CyclicBarrier(int parties)
    // 用于在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景(该线程的执行时机是在到达屏障之后再执行)

    重要方法
    //屏障 指定数量的线程全部调用await()方法时,这些线程不再阻塞
    // BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时
    public int await() throws InterruptedException, BrokenBarrierException
    public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException

    //循环 通过reset()方法可以进行重置

    public void reset();

    CyclicBarrier应用场景
    CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的场景。

    public class CyclicBarrierTest2 {
    
    //保存每个学生的平均成绩
    private ConcurrentHashMap<String, Integer> map=new ConcurrentHashMap<String,Integer>();
    
    private ExecutorService threadPool= Executors.newFixedThreadPool(3);
    
    private CyclicBarrier cb=new CyclicBarrier(3,()->{
        int result=0;
        Set<String> set = map.keySet();
        for(String s:set){
            result+=map.get(s);
        }
        System.out.println("三人平均成绩为:"+(result/3)+"分");
    });
    
    
    public void count(){
        for(int i=0;i<3;i++){
            threadPool.execute(new Runnable(){
    
                @Override
                public void run() {
                    //获取学生平均成绩
                    int score=(int)(Math.random()*40+60);
                    map.put(Thread.currentThread().getName(), score);
                    System.out.println(Thread.currentThread().getName()
                            +"同学的平均成绩为:"+score);
                    try {
                        //执行完运行await(),等待所有学生平均成绩都计算完毕
                        cb.await();
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
    
            });
        }
    }
    
    public static void main(String[] args) {
        CyclicBarrierTest2 cb=new CyclicBarrierTest2();
        cb.count();
    }
    

    利用CyclicBarrier的计数器能够重置,屏障可以重复使用的特性,可以支持类似“人满发车”的场景

    public class CyclicBarrierTest3 {
    
    public static void main(String[] args) {
    
        AtomicInteger counter = new AtomicInteger();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                5, 5, 1000, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(100),
                (r) -> new Thread(r, counter.addAndGet(1) + " 号 "),
                new ThreadPoolExecutor.AbortPolicy());
    
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5,
                () -> System.out.println("裁判:比赛开始~~"));
    
        for (int i = 0; i < 10; i++) {
            threadPoolExecutor.submit(new Runner(cyclicBarrier));
        }
    
    }
    static class Runner extends Thread{
        private CyclicBarrier cyclicBarrier;
        public Runner (CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
    
        @Override
        public void run() {
            try {
                int sleepMills = ThreadLocalRandom.current().nextInt(1000);
                Thread.sleep(sleepMills);
                System.out.println(Thread.currentThread().getName() + " 选手已就位, 准备共用时: " + sleepMills + "ms" + cyclicBarrier.getNumberWaiting());
                cyclicBarrier.await();
    
            } catch (InterruptedException e) {
                e.printStackTrace();
            }catch(BrokenBarrierException e){
                e.printStackTrace();
            }
        }
    }
    

    CountDownLatch与CyclicBarrier的区别

    CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同:
    CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次
    CyclicBarrier还提供getNumberWaiting(可以获得CyclicBarrier阻塞的线程数量)、isBroken(用来知道阻塞的线程是否被中断)等方法。
    CountDownLatch会阻塞主线程,CyclicBarrier不会阻塞主线程,只会阻塞子线程。
    CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同。CountDownLatch一般用于一个或多个线程,等待其他线程执行完任务后,再执行。CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行。
    CyclicBarrier 还可以提供一个 barrierAction,合并多线程计算结果。
    CyclicBarrier是通过ReentrantLock的"独占锁"和Conditon来实现一组线程的阻塞唤醒的,而CountDownLatch则是通过AQS的“共享锁”实现

    展开全文
  • CyclicBarrier 的使用

    2021-01-30 17:27:14
    1.CyclicBarrier 字面意思是可循环(Cyclic)使用的屏障(Barrier)。它要做的事情是让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时候,屏障才会开门。所有被屏障拦截的线程才会运行。 ...

    1.CyclicBarrier 字面意思是可循环(Cyclic)使用的屏障(Barrier)。它要做的事情是让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时候,屏障才会开门。所有被屏障拦截的线程才会运行。

     

     

    2.常用的方法:

     

    复制代码

     
    CyclicBarrier(int parties)
    创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。
    
    CyclicBarrier(int parties, Runnable barrierAction)
    创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。
    
    int await()
    在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
    
    int await(long timeout, TimeUnit unit)
    在所有参与者都已经在此屏障上调用 await 方法之前将一直等待,或者超出了指定的等待时间。
    
    int getNumberWaiting()
    返回当前在屏障处等待的参与者数目。
    
    int getParties()
    返回要求启动此 barrier 的参与者数目。
    
    boolean isBroken()
    查询此屏障是否处于损坏状态。
    
    void reset() 
    将屏障重置为其初始状态。如果调用了该函数,则在等待的线程将会抛出BrokenBarrierException异常。

    复制代码

    3.底层原理实现

     

     

    CyclicBarrier是由ReentrantLock可重入锁和Condition共同实现的。

    具体实现源码如下:

    复制代码

    //1.CyclicBarrier构造方法
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        // parties表示“必须同时到达barrier的线程个数”。
        this.parties = parties;
        // count表示“处在等待状态的线程个数”。
        this.count = parties;
        // barrierCommand表示“parties个线程到达barrier时,会执行的动作”。
        this.barrierCommand = barrierAction;
    }

    复制代码

    1. await源码分析:

    复制代码

     
    
    public int await() throws InterruptedException, BrokenBarrierException {
            try {
                return dowait(false, 0L);
            } catch (TimeoutException toe) {
                throw new Error(toe); // cannot happen;
            }
        }

    复制代码

    复制代码

    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        // 获取“独占锁(lock)”
        lock.lock();
        try {
            // 保存“当前的generation”
            final Generation g = generation;
    
            // 若“当前generation已损坏”,则抛出异常。
            if (g.broken)
                throw new BrokenBarrierException();
    
            // 如果当前线程被中断,则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
    
           // 将“count计数器”-1
           int index = --count;
           // 如果index=0,则意味着“有parties个线程到达barrier”。
           if (index == 0) {  // tripped
               boolean ranAction = false;
               try {
                   // 如果barrierCommand不为null,则执行该动作。
                   final Runnable command = barrierCommand;
                   if (command != null)
                       command.run();
                   ranAction = true;
                   // 唤醒所有等待线程,并更新generation。
                   nextGeneration();
                   return 0;
               } finally {
                   if (!ranAction)
                       breakBarrier();
               }
           }
    
            // 当前线程一直阻塞,直到“有parties个线程到达barrier” 或 “当前线程被中断” 或 “超时”这3者之一发生,
            // 当前线程才继续执行。
            for (;;) {
                try {
                    // 如果不是“超时等待”,则调用await()进行等待;否则,调用awaitNanos()进行等待。
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    // 如果等待过程中,线程被中断,则执行下面的函数。
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }
    
                // 如果“当前generation已经损坏”,则抛出异常。
                if (g.broken)
                    throw new BrokenBarrierException();
    
                // 如果“generation已经换代”,则返回index。
                if (g != generation)
                    return index;
    
                // 如果是“超时等待”,并且时间已到,则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程,并抛出TimeoutException异常
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            // 释放“独占锁(lock)”
            lock.unlock();
        }
    }

    复制代码

     

    实例如下:

    1.await()方法使用

    复制代码

    public class CyclicBarrierTest {
    
        private static int SIZE = 5;
    
        private static CyclicBarrier cb;
    
        public static void main(String[] args) {
            cb = new CyclicBarrier(SIZE);
            for (int i = 0; i < SIZE; i++) {
                new MyTask().start();
            }
    
        }
    
        static class MyTask extends Thread {
            @Override
            public void run() {
                try {
    
                    System.out.println("线程" + Thread.currentThread().getName() + "正在执行同一个任务");
                    // 以睡眠来模拟几个线程执行一个任务的时间
                    Thread.sleep(1000);
                    System.out.println("线程" + Thread.currentThread().getName() + "执行任务完成,等待其他线程执行完毕");
                    // 用来挂起当前线程,直至所有线程都到达barrier状态再同时执行后续任务;
                    cb.await();
    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("所有线程写入完毕");
    
            }
    
        }
    
    }

    复制代码

    结果

     

    复制代码

    线程Thread-0正在执行同一个任务
    线程Thread-2正在执行同一个任务
    线程Thread-1正在执行同一个任务
    线程Thread-3正在执行同一个任务
    线程Thread-4正在执行同一个任务
    线程Thread-3执行任务完成,等待其他线程执行完毕
    线程Thread-4执行任务完成,等待其他线程执行完毕
    线程Thread-0执行任务完成,等待其他线程执行完毕
    线程Thread-1执行任务完成,等待其他线程执行完毕
    线程Thread-2执行任务完成,等待其他线程执行完毕
    所有线程写入完毕
    所有线程写入完毕
    所有线程写入完毕
    所有线程写入完毕
    所有线程写入完毕

    复制代码

     

     

    2.使用 barrier.await(2000, TimeUnit.MILLISECONDS)方法:

    复制代码

    public class CyclicBarrierWriteDataWaitTimeOutTest {
    
        private static final int THREAD_NUM = 5;
    
        private static final Random random = new Random();
    
        public static void main(String[] args) throws InterruptedException {
    
            // 使用构造方法:public CyclicBarrier(int parties, Runnable barrierAction)
            // 参数parties表示一共有多少线程参与这次“活动”,barrierAction是可选的,用来指定当所有线程都完成这些必须的“任务”之后要干的其他事情
            CyclicBarrier barrier = new CyclicBarrier(THREAD_NUM, new Runnable() {
    
                @Override
                public void run() {
                    // 最后写完数据的线程,会执行这个任务
                    System.out.println(Thread.currentThread().getId() + ":所有线程写数据完毕!");
                }
            });
    
            // 启动5个线程,写数据
            for (int i = 0; i < THREAD_NUM; i++) {
                if (i < THREAD_NUM - 1) {
                    Thread t = new Thread(new MyTask(barrier));
                    t.start();
                } else {
                    // 最后一个线程延迟3秒执行
                    Thread.sleep(3000);
                    Thread t = new Thread(new MyTask(barrier));
                    t.start();
                }
    
            }
    
        }
    
        /**
         * 
         * (线程类)
         *
         * <p>
         * 修改历史:                                            <br>  
         * 修改日期            修改人员       版本             修改内容<br>  
         * -------------------------------------------------<br>  
         * 2016年8月26日 上午11:21:39   user     1.0        初始化创建<br>
         * </p> 
         *
         * @author        Peng.Li 
         * @version        1.0  
         * @since        JDK1.7
         */
        static class MyTask extends Thread {
    
            private CyclicBarrier barrier;
    
            public MyTask(CyclicBarrier barrier) {
                this.barrier = barrier;
            }
    
            @Override
            public void run() {
    
                int time = random.nextInt(1000);
                System.out.println(Thread.currentThread().getId() + ":需要" + time + "毫秒的时间写入数据");
    
                try {
                    Thread.sleep(time);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
                System.out.println(Thread.currentThread().getId() + ":写入数据完毕,等待其他线程写入数据");
                try {
                    // 用来挂起当前线程,直至所有线程都到达barrier状态再同时执行后续任务;
                    // 等待所有线程都调用过此函数才能继续后续动作
                    // 只等待2s,那么最后一个线程3秒才执行,则必定会超时
                    barrier.await(2000, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getId() + ":所有线程写入数据完毕,继续处理其他任务...");
    
            }
    
        }
    
    }

    复制代码

    从结果分析:

    可以看到,前面四个线程等待最后一个线程超时了,这个时候这四个线程不会再等待最后一个线程写入完毕,而是直接抛出BrokenBarrierException

    异常,继续执行后续的动作。最后一个线程完成写入数据操作后也继续了后续的动作。

    需要理解的是:最后一个线程发生超时的异常,其他的线程不会继续等待,而是去执行其他的任务。

    复制代码

    Thread-1:需要650毫秒的时间写入数据
    Thread-3:需要297毫秒的时间写入数据
    Thread-5:需要755毫秒的时间写入数据
    Thread-7:需要79毫秒的时间写入数据
    Thread-7:写入数据完毕,等待其他线程写入数据
    Thread-3:写入数据完毕,等待其他线程写入数据
    Thread-1:写入数据完毕,等待其他线程写入数据
    Thread-5:写入数据完毕,等待其他线程写入数据
    Thread-7:所有线程写入数据完毕,继续处理其他任务...
    Thread-1:所有线程写入数据完毕,继续处理其他任务...
    java.util.concurrent.TimeoutException
    Thread-3:所有线程写入数据完毕,继续处理其他任务...
    Thread-5:所有线程写入数据完毕,继续处理其他任务...
        at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
        at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:427)
        at concurrentMy.CountAndCyclicAndSemaphore.cyclibarrier.test.CyclicBarrierWriteDataWaitTimeOutTest$MyTask.run(CyclicBarrierWriteDataWaitTimeOutTest.java:299)
        at java.lang.Thread.run(Thread.java:745)
    java.util.concurrent.BrokenBarrierException
        at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:243)
        at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:427)
        at concurrentMy.CountAndCyclicAndSemaphore.cyclibarrier.test.CyclicBarrierWriteDataWaitTimeOutTest$MyTask.run(CyclicBarrierWriteDataWaitTimeOutTest.java:299)
        at java.lang.Thread.run(Thread.java:745)
    java.util.concurrent.BrokenBarrierException
        at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:243)
        at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:427)
        at concurrentMy.CountAndCyclicAndSemaphore.cyclibarrier.test.CyclicBarrierWriteDataWaitTimeOutTest$MyTask.run(CyclicBarrierWriteDataWaitTimeOutTest.java:299)
        at java.lang.Thread.run(Thread.java:745)
    java.util.concurrent.BrokenBarrierException
        at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:243)
        at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:427)
        at concurrentMy.CountAndCyclicAndSemaphore.cyclibarrier.test.CyclicBarrierWriteDataWaitTimeOutTest$MyTask.run(CyclicBarrierWriteDataWaitTimeOutTest.java:299)
        at java.lang.Thread.run(Thread.java:745)
    Thread-9:需要164毫秒的时间写入数据
    Thread-9:写入数据完毕,等待其他线程写入数据java.util.concurrent.BrokenBarrierException
    Thread-9:所有线程写入数据完毕,继续处理其他任务...
    
        at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:200)
        at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:427)
        at concurrentMy.CountAndCyclicAndSemaphore.cyclibarrier.test.CyclicBarrierWriteDataWaitTimeOutTest$MyTask.run(CyclicBarrierWriteDataWaitTimeOutTest.java:299)
        at java.lang.Thread.run(Thread.java:745)

    复制代码

     

    展开全文
  • 《ReentrantLock实现原理及源码分析》中介绍了AQS的同步队列独占模式实现的ReentranLock,这节将会介绍一下Semaphore、CountDownLatch和CyclicBarrier,并将会对源码进行分析。 一、Semaphore使用及源码分析 1、...

    前面《JUC并发核心AQS同步队列原理详解》介绍了AQS的同步等待队列的实现原理及源码分析,《ReentrantLock实现原理及源码分析》中介绍了AQS的同步队列独占模式实现的ReentranLock,这节将会介绍一下Semaphore、CountDownLatch和CyclicBarrier,并将会对源码进行分析。

    一、Semaphore使用及源码分析

    1、Semaphore的使用介绍

    Semaphore也就是我们常说的信号量, Semaphore可以控制同时访问的线程个数,通过 acquire 获取一个许可,如果没有就等待,通过 release 释放一个许可。有点类似限流的作用,可以用来做简单的单机版的限流。

    Semaphore的构造方法可以传入许可的数量,另外还可以传入一个boolean类型的参数用来执行公平、非公平模式,不传默认是非公平模式。

    下面是一个简单的程序示例:信号量初始为2,两个共享资源三个线程竞争,开始只有两个线程可以获得许可往下执行,有一个线程在acquire的地方被阻塞,前面的线程任何一个调用release释放资源之后,阻塞的线程才可以继续往下执行。

    public class SemaphoreTest {

        public static void main(String[] args) {
            Semaphore semaphore = new Semaphore(2);
            for (int i=0;i<3;i++){
                new Thread(()->{
                    try {
                        semaphore.acquire();
                        System.out.println(Thread.currentThread().getName()+":aquire() at time:"+System.currentTimeMillis());
                        Thread.sleep(5000);
                        semaphore.release();
                        System.out.println(Thread.currentThread().getName()+":release() at time:"+System.currentTimeMillis());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                },"Thread"+i).start();
            }
        }
    }

    输出如下:

    Thread0:aquire() at time:1610780063431
    Thread1:aquire() at time:1610780063432
    Thread1:release() at time:1610780068433
    Thread2:aquire() at time:1610780068433
    Thread0:release() at time:1610780068433
    Thread2:release() at time:1610780073434 

     2、Semaphore的源码分析

    Semaphore是AQS同步队列的共享模式实现的,它的内部实现结构跟ReentrantLock很像,也是有一个内部类Sync,Sync是AbstractQueuedSynchronizer的子类,Sync有两个子类FairSync和NonFairSync,分别用来实现公平模式和非公平模式。

    创建Semaphore 实例的时候,需要一个参数 permits,这个是设置给 AQS的state的,然后每个线程调用 acquire 的时候,执行 state = state 1,release 的时候执行的时候执行 state = state + 1。当然,acquire的的时候,如果 state = 0,说明没有资源了,需要等待其他线程 release。

    注意:《JUC并发核心AQS同步队列原理详解》中关于AQS中的关键方法已经介绍过了,下面涉及AQS中的方法不再重复介绍。

    1、构造方法分析:创建了一个非公平模式的NonfairSync对象,传入许可证数量。NonfairSync的构造方法中调用了父类Sync的构造,就是把state的值设置为许可证的数量。

    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    NonfairSync(int permits) {
        super(permits);
    }

    Sync(int permits) {
        setState(permits);
    }

     2、acquire方法源码分析

    我们只分析下非公平模式的流程,公平模式的代码差别不大,唯一的区别就是公平模式时会先判断一下队列上是否有等待的线程,如果有会进行排队。acquire方法调用了AbstractQueuedSynchronizer中的acquireSharedInterruptibly

    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    acquireSharedInterruptibly方法如下所示:会调用NonFairSync实现的tryAcquireShared方法,如果该方法返回值小于0即没有可用获取的许可证,会调用AQS中的doAcquireSharedInterruptibly方法进行排队等待

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

     NonFairSync实现的tryAcquireShared方法如下所示:里面调用了父类Sync的nonfairTryAcquireShared方法

    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }

     Sync的nonfairTryAcquireShared方法如下:获取当前state值即可用资源数量,然后通过当前数量-要获取数量计算出剩余的数量remaining,如果remaining小于0,说明获取资源失败,返回remaining的值,如果remaining>=0,则表示当前线程可以尝试获取资源,通过CAS的方式将state的值设置为remaining,如果CAS设置失败继续下一次循环,如果设置成功返回remaining的值(此时大于等于0)。

    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }

    3、release方法源码分析 

    release方法会调用AQS中的releaseShared方法

    public void release() {
        sync.releaseShared(1);
    }

    AQS中的releaseShared方法:先调用Sync中重写的tryReleaseShared方法释放资源即减state的值,如果返回true,会调用AQS中的doReleaseShared方法,doReleaseShared方法会唤醒同步队列中被阻塞的线程,线程被唤醒后会继续尝试获取资源。

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

    Sync中重写的tryReleaseShared方法: 获取当前state值current,将current加上释放的资源数量得到新值next,如果next<current说明求和得到的值超过了int的最大值导致溢出了,抛出异常;通过CAS的方式将state设置为新值,如果设置成功返回true,如果设置失败继续下一次循环。

    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
            int next = current + releases;
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            if (compareAndSetState(current, next))
                return true;
        }

    二、CountDownLatch使用及源码分析

    1、CountDownLatch的使用介绍

    CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完毕再执行。从命名可以解读到countdown是倒数的意思,类似于我们倒计时的概念。

    CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量,计数器的值也是通过state实现的。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。

    CountDownLatch常用方法:

    • CountDownLatch.countDown() 会将计数器数量减1

    • CountDownLatch.await() 计数器减到0之前,调用await方法的线程会一直阻塞

    使用示例如下:

    public class CountDownLatchTest {
        public static void main(String[] args) throws InterruptedException {
            CountDownLatch latch = new CountDownLatch(2);
            for (int i = 0; i < 2; i++) {
                new Thread(()->{
                    System.out.println(Thread.currentThread().getName()+"执行任务完成");
                    latch.countDown();
                },"Thread"+i).start();
            }

            //等待线程池中的2个任务执行完毕,否则一直阻塞等待
            latch.await();
            System.out.println("线程池任务执行结束,主线程继续执行");
        }
    }

     控制台输出如下所示:可以看到线程池中任务执行完成之前,主线程一直在阻塞等待。

    Thread1执行任务完成
    Thread0执行任务完成
    线程池任务执行结束,主线程继续执行

    2、CountDownLatch的源码分析 

    CountDownLatch的源码结构其实跟上面的Semaphore差不多,主要区别在于其内部类Sync中重写的AQS中的tryAcquireShared和tryReleaseShared方法不一样,这里就不详细分析了。

    await方法会调用tryAcquireShared方法:如果state为0则可以往下执行,如果state不为0,说明计数器还没有减到0,说明线程得排队阻塞。

    countDown方法会调用tryReleaseShared方法:主要就是将state的值减1,如果减到0了则会唤醒队列中阻塞的线程。

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }

    三、CyclicBarrier使用介绍

    CyclicBarrier也称为栅栏屏障,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

    CyclicBarrier与CountDownLatch的功能比较相似,区别是CyclicBarrier的计数器可以重用。CyclicBarrier的构造方法会传一个数量parties,只有调用await的次数达到了parties时,await方法才会向下执行,否则调用await方法的地方会一直被阻塞。调用await的次数达到了parties后,线程继续向下执行,我们可以再次调用await方法,如果调用次数没有达到parties,会继续被阻塞,也就达到了计数器重用的功能。

    下面通过一个程序示例演示一下:

    public class CyclicBarrierDemo {
        public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
            CyclicBarrier cyclicBarrier = new CyclicBarrier(3, ()->{
                System.out.println("所有任务都已经到达屏障,可以开始执行了");
            });

            for (int i = 0; i < 2; i++) {
                new Thread(()->{
                    try {
                        cyclicBarrier.await();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName()+"执行了");
                },"第一批Thread"+i).start();
            }

            cyclicBarrier.await();
            //休眠一下,防止输出打印混在一起
            Thread.sleep(100L);
            System.out.println("==========第一批任务都准备好了,主线程继续执行");
            //休眠一下,防止输出打印混在一起
            Thread.sleep(100L);

            for (int i = 0; i < 2; i++) {
                new Thread(()->{
                    try {
                        cyclicBarrier.await();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName()+"执行了");
                },"第二批Thread"+i).start();
            }

            cyclicBarrier.await();
            //休眠一下,防止输出打印混在一起
            Thread.sleep(100L);
            System.out.println("--------------第二批任务都准备好了,主线程继续执行");
        }
    }

    输出如下所示:通过输出结果,可以看到了CyclicBarrier的await方法阻塞线程及计数器重用的特性。

    所有任务都已经到达屏障,可以开始执行了
    第一批Thread1执行了
    第一批Thread0执行了
    ==========第一批任务都准备好了,主线程继续执行
    所有任务都已经到达屏障,可以开始执行了
    第二批Thread1执行了
    第二批Thread0执行了
    --------------第二批任务都准备好了,主线程继续执行 

     关于CyclicBarrier的源码就不再介绍了,感兴趣的话可以自己查看分析一下。

    展开全文
  • 文章目录1 CyclicBarrier介绍2 入门:等待所有子任务结束3 CyclicBarrier的循环特性和源码解读4 CyclicBarrier的其他方法5 CyclicBarrier VS CountDownLatch 1 CyclicBarrier介绍 CyclicBarrier(循环屏障),它也是...

    1 CyclicBarrier介绍

    CyclicBarrier(循环屏障),它也是一个同步助手工具,它允许多个线程在执行完相应的操作之后彼此等待共同到达一个障点(barrier point)

    CyclicBarrier也非常适合用于某个串行化任务被分拆成若干个并行执行的子任务,当所有的子任务都执行结束之后再继续接下来的工作。从这一点来看,Cyclic Barrier与CountDownLatch非常类似,但是它们之间的运行方式以及原理还是存在着比较大的差异的,并且CyclicBarrier所能支持的功能CountDownLatch是不具备的:

    • CyclicBarrier可以被重复使用,而CountDownLatch当计数器为0的时候就无法再次利用

    ]

    2 入门:等待所有子任务结束

    注意和CountDownLatch的使用区别:并发工具:CountDownLatch

    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author wyaoyao
     * @date 2021/4/20 11:09
     */
    public class CyclicBarrierDemo1 {
    
        public static void main(String[] args) {
            // 定义了一个CyclicBarrier,虽然要求传入大于0的int数字,但是它所代表的含义是“分片”而不再是计数器,虽然它的作用与计数器几乎类似。
            CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
            // 构建三个线程
            List<Thread> threads = new ArrayList<>();
            for (int i = 1; i <= 3; i++){
                int finalI = i;
                Thread thread = new Thread(() -> {
                    try {
                        // 模拟线程执行
                        TimeUnit.SECONDS.sleep(finalI);
                        System.out.println(Thread.currentThread().getName() + " 执行结束");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        // 在此等待其他子线程到达barrier point
                        try {
                            cyclicBarrier.await();
                            // 还会抛出BrokenBarrierException异常
                        } catch (BrokenBarrierException | InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }, "T-" + i);
                threads.add(thread);
                thread.start();
            }
            // 等待所有子任务线程结束
            threads.forEach(t-> {
                try {
                    t.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            System.out.println("三个子线程全部执行结束,主线程可以继续执行了");
        }
    }
    

    输出:

    T-1 执行结束
    T-2 执行结束
    T-3 执行结束
    三个子线程全部执行结束,主线程可以继续执行了
    
    • 调用await方法等待其他子线程也运行结束到达一个共同的barrier point,该await方法还会返回一个int的值,该值所代表的意思是当前任务到达的次序
    • 逐一调用每一个子线程的join方法,使当前线程进入阻塞状态等待所有的子线程运行结束。

    和CountDownLatch执行方式的差异:

    • CyclicBarrier,在子任务线程中,当执行结束后调用await方法使当前的子线程进入阻塞状态,直到其他所有的子线程都结束了任务的运行之后,它们才能退出阻塞
    • CountDownLatch,是主线程等待CountDownLatch计数器降为0,退出阻塞

    可以通过一个小技巧使代码变得更加简洁:

    public static void main(String[] args) {
        // 定义4个分片,让主线程也成为一个子任务
        CyclicBarrier cyclicBarrier = new CyclicBarrier(4);
        // 构建三个线程
        for (int i = 1; i <= 3; i++){
            int finalI = i;
            Thread thread = new Thread(() -> {
                try {
                    // 模拟线程执行
                    TimeUnit.SECONDS.sleep(finalI);
                    System.out.println(Thread.currentThread().getName() + " 执行结束");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    try {
                        cyclicBarrier.await();
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            }, "T-" + i);
            thread.start();
        }
        try {
        	// 在主线程中调用await方法,等待其他子任务线程也到达barrier point
            cyclicBarrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println("三个子线程全部执行结束,主线程可以继续执行了");
    }
    

    3 CyclicBarrier的循环特性和源码解读

    CyclicBarrier的另一个很好的特性是可以被循环使用,也就是说当其内部的计数器为0之后还可以在接下来的使用中重置而无须重新定义一个新的

    简单演示,刚刚的逻辑执行两遍,使用同一个CyclicBarrier

    public static void main(String[] args) {
        // 定义4个分片,让主线程也成为一个子任务
        CyclicBarrier cyclicBarrier = new CyclicBarrier(4);
        // 构建三个线程
        for (int i = 1; i <= 3; i++){
            int finalI = i;
            Thread thread = new Thread(() -> {
                try {
                    // 模拟线程执行
                    TimeUnit.SECONDS.sleep(finalI);
                    System.out.println(Thread.currentThread().getName() + " 执行结束");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    try {
                        cyclicBarrier.await();
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            }, "T-" + i);
            thread.start();
        }
        try {
            cyclicBarrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println("三个子线程全部执行结束,主线程可以继续执行了");
    
        // 重新构建三个线程
        for (int i = 1; i <= 3; i++){
            int finalI = i;
            Thread thread = new Thread(() -> {
                try {
                    // 模拟线程执行
                    TimeUnit.SECONDS.sleep(finalI);
                    System.out.println(Thread.currentThread().getName() + " 执行结束");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    try {
                    	// 使用同一个CyclicBarrier
                        cyclicBarrier.await();
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            }, "T-C-" + i);
            thread.start();
        }
        try {
            cyclicBarrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println("三个子线程全部执行结束,主线程可以继续执行了");
    }
    

    输出:

    T-1 执行结束
    T-2 执行结束
    T-3 执行结束
    三个子线程全部执行结束,主线程可以继续执行了
    T-C-1 执行结束
    T-C-2 执行结束
    T-C-3 执行结束
    三个子线程全部执行结束,主线程可以继续执行了
    

    使用同一个CyclicBarrier来进行控制的,在这里需要注意的是,在主线程中的两次await中间为何没有对barrier进行reset的操作,那是因为在CyclicBarrier内部维护了一个count。当所有的await调用导致其值为0的时候,reset相关的操作会被默认执行:
    看一下源码:

    public int await()
        throws InterruptedException, BrokenBarrierException{
        try {
                return dowait(false, 0L);
            } catch (TimeoutException toe) {
                throw new Error(toe); // cannot happen
            }
    }
    
    private int dowait(boolean timed, long nanos)
       throws InterruptedException, BrokenBarrierException,
              TimeoutException {
    ...省略
           int index = --count;
           / 当count为0的时候
           if (index == 0) {  // tripped
               boolean ranAction = false;
               try {
                   final Runnable command = barrierCommand;
                   if (command != null)
                       command.run();
                   ranAction = true;
                   // 生成新的Generation,并且直接返回
                   nextGeneration();
                   return 0;
               } finally {
                   if (!ranAction)
                       breakBarrier();
               }
           }
    ...省略
       }
    }
    private void nextGeneration() {
        // 唤醒阻塞中的所有线程
        trip.signalAll();
        // set up next generation
        // 修改count的值使其等于构造CyclicBarrier转入的parties值
        count = parties;
        // 创建新的Generation
        generation = new Generation();
    }
    

    当count的值为0的时候,最后会重新生成新的Generation,并且将count的值设定为构造CyclicBarrier转入的parties值。

    接下里看一下CyclicBarrier reset的源码片段。

    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 调用break barrier方法
            breakBarrier();   // break the current generation
            // 重新生成新的generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }
    
    private void breakBarrier() {
        // generation的broken设置为true,标识该barrier已经被broken了
        generation.broken = true;
        // 重置count的值
        count = parties;
        // 唤醒阻塞的其他线程
        trip.signalAll();
    }
    

    虽然在reset方法中调用了breakBarrier方法和唤醒其他新阻塞线程,但是它们都会被忽略掉,根本不会影响到dowait方法中的线程(因为执行该方法的线程已经没有了),紧接着generation又会被重新创建,因此在主线程的两次await方法调用之间完全可以不用调用reset方法

    4 CyclicBarrier的其他方法

    /**
    构造CyclicBarrier并且传入parties。
    **/
    CyclicBarrier(int parties)
    /**
    构造CyclicBarrier不仅传入parties,而且指定一个Runnable接口,
    当所有的线程到达barrier point的时候,该Runnable接口会被调用,
    有时我们需要在所有任务执行结束之后执行某个动作,这时就可以使用这种CyclicBarrier的构造方式了。
    **/
    CyclicBarrier(int parties, Runnable barrierAction);
    
    /**
    获取CyclicBarrier在构造时的parties,该值一经CyclicBarrier创建将不会被改变。
    **/
    int getParties();
    /**
    调用该方法之后,当前线程将会进入阻塞状态,等待其他线程执行await()方法进入barrier point,
    进而全部退出阻塞状态,
    当CyclicBarrier内部的count为0时,调用await()方法将会直接返回而不会进入阻塞状态。
    **/
    int await() throws InterruptedException, BrokenBarrierException 
    
    /**
    与无参的await方法类似,只不过增加了超时的功能
    当其他线程在设定的时间内没有到达barrier point时,当前线程也会退出阻塞状态
    **/
    int await(long timeout, TimeUnit unit)
    /**
    返回barrier的broken状态,某个线程由于执行await方法而进入阻塞状态,
    如果该线程被执行了中断操作,那么isBroken()方法将会返回true。
    **/
    boolean isBroken()
    /**
    该方法返回当前barrier有多少个线程执行了await方法
    而不是还有多少个线程未到达barrier point
    **/
    int getNumberWaiting()
    /**
    其主要作用是中断当前barrier,并且重新生成一个generation,
    还有将barrier内部的计数器count设置为parties值,
    但是需要注意的是,如果还有未到达barrier point的线程,
    则所有的线程将会被中断并且退出阻塞,此时isBroken()方法将返回false而不是true。
    **/
    void reset()
    

    演示isBroken方法

    public static void main(String[] args) throws InterruptedException {
        final CyclicBarrier barrier = new CyclicBarrier(2);
        Thread thread = new Thread(() -> {
            try {
                barrier.await();
            } catch (InterruptedException e) {
                System.out.println("捕获中断信号");
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        });
        thread.start();
        // 两秒后在main线程中执行thread的中断操作
        TimeUnit.SECONDS.sleep(2);
        // 调用中断
        thread.interrupt();
        // 短暂休眠,确保thread的执行动作发生在main线程读取broken状态之前
        TimeUnit.SECONDS.sleep(2);
        // 输出barrier的broken状态,这种情况下该返回值肯定为true
        System.out.println(barrier.isBroken());
    }
    

    可见:

    • 当一个线程由于在执行CyclicBarrier的await方法而进入阻塞状态时,这个时候对该线程执行中断操作会导致CyclicBarrier被broken。
    • 被broken的CyclicBarrier此时已经不能再直接使用了,如果想要使用就必须使用reset方法对其重置。
    • 如果有其他线程此时也由于执行了await方法而进入阻塞状态,那么该线程会被唤醒并且抛出BrokenBarrierException异常。

    演示reset方法

    public static void main(String[] args) throws InterruptedException {
         final CyclicBarrier barrier = new CyclicBarrier(3);
    
         Thread thread = new Thread(() ->
         {
             try {
                 barrier.await();
             } catch (InterruptedException | BrokenBarrierException e) {
                 e.printStackTrace();
             }
         });
         thread.start();
         TimeUnit.SECONDS.sleep(2);
         // 执行reset方法,thread线程将会被中断
         barrier.reset();
         // 此时isBroken()为false而不是true
         System.out.println(barrier.isBroken());
     }
    

    输出:

    false
    java.util.concurrent.BrokenBarrierException
    	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
    	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
    	at study.wyy.juc.utils.CyclicBarrierDemo2.lambda$main$0(CyclicBarrierDemo2.java:44)
    	at java.lang.Thread.run(Thread.java:748)
    

    5 CyclicBarrier VS CountDownLatch

    CyclicBarrier和CoundDownLatch两者都可用于管理和控制子任务线程的执行,在某些场景下,它们都可以实现类似的功能,但是它们在本质上存在着很多差别:

    • CoundDownLatch的await方法会等待计数器被count down到0,而执行CyclicBarrier的await方法的线程将会等待其他线程到达barrier point。
    • CyclicBarrier内部的计数器count是可被重置的,进而使得CyclicBarrier也可被重复使用,而CoundDownLatch则不能。
    • CyclicBarrier是由Lock和Condition实现的,而CountDownLatch则是由同步控制器AQS(AbstractQueuedSynchronizer)来实现的。
    • 在构造CyclicBarrier时不允许parties为0,而CountDownLatch则允许count为0
    展开全文
  • 一说到并发编程、多线程,我们总会想到CountDownLatch、CyclicBarrier和Semaphore,今天我们探讨下它们之间的区别及底层原理。
  • CyclicBarrier原理详解

    2021-06-15 14:53:46
    介绍 一种同步辅助工具,允许一组线程都等待彼此到达一个共同的障碍点。这个屏障被称为循环,因为它可以在等待的线程被释放后重新使用,之前分析过CountDownLatch,...通过一个简单的例子来理解一下CyclicBarrier,例
  • 原标题:Java并发编程之CyclicBarrier和线程池的使用下面我们来讲述一下线程池和CyclicBarrier的使用和对比。一、场景描述有四个游戏玩爱好者玩游戏,游戏中有三个关卡,每一个关卡必须让所有玩家到达后才能允许通过...
  • 目录 前言 Maven依赖 代码 ...工作中是否有这样的场景,多个...之后再提供一个循环屏障,CyclicBarrier,更优雅的实现工具。 Maven依赖 可以依赖,也可以不依赖,只是代码要稍微多一些,最好添加。 <
  • CountDownLatch和CyclicBarrier区别CountDownLatch 和 CyclicBarrier 是 Java 并发包提供的两个非常易用的线程同步工具类,把它们放...
  • 介绍 斗地主是一个非常有意思的娱乐活动,但是斗地主必须够3个人才能开始,每次凑够3个人就...当然有,这就是CyclicBarrier。 我们用CyclicBarrier模拟一下上面的场景 public class CyclicBarrierUseCase1 { public.
  • CyclicBarrier 可循环屏障,作用是使得一组线程相互等待,直到最后一个线程到达屏障后,释放所有阻塞在该屏障上的线程,用于线程同步通信。 public class CyclicBarrier { private static class Generation { ...
  • cyclicBarrier

    2021-04-30 13:53:34
    cyclicBarrier与countDownLatch相反 countDownLatch是从初始值减到0 cyclicBarrier是从0加到初始值 cyclicBarrier先到的线程先进行等待,只有达到parties的个数,就可以执行构造的方法 构造器: 1.public ...
  • CyclicBarrier 让所有线程等待命令在继续执行 字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。我们暂且把这...
  • 在项目中看到用到了CountDownLatch和CyclicBarrier,这两个作用非常相似,这里写一些demo区分一下两者和熟悉一下API。 1.CountDownLatch的使用 描述: CountDownLatch是同步辅助工具类,允许一个或多个线程等待,...
  • Share 共享式 多个线程可同时执行,如 Semaphore、CountDownLatch、 CyclicBarrier、ReadWriteLock; ReentrantReadWriteLock 可以看成是组合式,因为 ReentrantReadWriteLock 也就是读写锁允许多个线程同时...
  • 最近又在重读CyclicBarrier源码,并进行了深入分析,重点源码也自己跟过并做了一些注释,仅供大家参考。 CyclicBarrier:回环栅栏(有人也称之为循环屏障),通过他可以让一组线程等待至某个状态(屏障点)之后再全部...
  • 点击关注公众号,实用技术文章及时了解来源:blog.csdn.net/zyzzxycj/article/details/90241892前言CyclicBarrier和CountDownL...
  • CyclicBarrier cyclicBarrier = new CyclicBarrier(appIdInfoListStart.size()); try { for (int i = 0;i ();i++) { String key = "Tiktok_JOB_" + appIdInfoListStart.get(i).getAppId(); if (!threadMap....
  • public class CyclicBarrierTest { public static void main(String[] args) throws IOException, InterruptedException { CyclicBarrier barrier = new CyclicBarrier(3, () -> { System.out.println("栅栏已达到...
  • CyclicBarrier加法计数器

    2021-10-03 20:41:44
    CyclicBarrier可以认为是一个栅栏。一共4个线程A、B、C、D,它们到达栅栏的顺序可能各不相同。当A、B、C到达栅栏后,由于没有满足总数【4】的要求,所以会一直等待,当线程D到达后,栅栏才会放行。 public ...
  • CyclicBarrier(屏障)

    2021-08-26 22:36:11
    java.util.concurrent.CyclicBarrier 概述: CyclicBarrier的字面意思是可循环使用(cyclic)的屏障(barrier) 它要做的事情,是让一组线程到达一个屏障(也叫做同步点)时被阻塞, 直到最后一个线程到达屏障时,...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 31,351
精华内容 12,540
关键字:

cyclicbarrier