-
2018-02-15 17:49:39
在上一篇博客《生产者消费者模式之Lock与Condition》中已经对Condition的使用有所了解了,下面再举一个之前在网上看过的例子作为Condition的补充。
问题:假设有三个线程,一个主线程mainThread和两个子线程subThread1、subThread2,要求按顺序输出,mainThread先输出1——3,然后subThread1输出4——6,最后subThread2输出7——9,循环执行。
问题分析:有三个线程要顺序执行,所以我们可以定义三个Condition分别表示这三个线程,然后定义一个判断标记flag,flag==0表示mainThread、flag==1表示subThread1、flag==2表示subThread2。
package com.gk.thread.condition.demo; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ConditionDemo { private static int count = 1; private Lock lock = new ReentrantLock(); private Condition mainCondition = lock.newCondition(); private Condition subCondition1 = lock.newCondition(); private Condition subCondition2 = lock.newCondition(); /* * flag = 0代表mainCondition * flag = 1代表subCondition1 * flag = 2代表subCondition2 */ private int flag = 0; /** * mainThread线程调的方法 */ public void mainThread() { lock.lock(); try { while(flag != 0) { mainCondition.await(); // 如果flag != 0,mainThread线程等待 } // 如果flag == 0,mainThread线程往下执行 System.out.println("第" + count++ + "次输出"); for(int i=1; i<=3; i++) { System.out.println("mainThread : " + i); } System.out.println(); // 修改flag = 1,并唤醒subCondition1执行 flag = 1; subCondition1.signal(); } catch (InterruptedException e) { throw new RuntimeException(e); }finally { lock.unlock(); } } /** * subThread1线程调的方法 */ public void subThread1() { lock.lock(); try { while(flag != 1) { subCondition1.await(); } for(int i=4; i<=6; i++) { System.out.println("subThread1 : " + i); } System.out.println(); flag = 2; subCondition2.signal(); } catch (InterruptedException e) { throw new RuntimeException(e); }finally { lock.unlock(); } } /** * subThread2线程调的方法 */ public void subThread2() { lock.lock(); try { while(flag != 2) { subCondition2.await(); } for(int i=7; i<=9; i++) { System.out.println("subThread2 : " + i); } System.out.println("\n=======================\n"); flag = 0; mainCondition.signal(); } catch (InterruptedException e) { throw new RuntimeException(e); }finally { lock.unlock(); } } }
测试代码:
package com.gk.thread.condition.demo; public class Test { public static void main(String[] args) { final int COUNT = 3; ConditionDemo cd = new ConditionDemo(); for (int i=1; i<=COUNT; i++) { new Thread(new Runnable() { @Override public void run() { cd.mainThread(); // mainThread线程 } }).start(); new Thread(new Runnable() { @Override public void run() { cd.subThread1(); // subThread1线程 } }).start(); new Thread(new Runnable() { @Override public void run() { cd.subThread2(); // subThread2线程 } }).start(); } } }
最后再友情提示一句:在API中有Condition实现阻塞队列的示例,读者可以体会其中的算法思想。
更多相关内容 -
condition:函数式条件判断语法,用于替代if else、case when语法
2021-05-08 12:30:21Condition.empty().when(false).set("1").orWhenEquals("1").toDo(p -> System.out.println("1")); Condition.of(1).whenEquals(1).toDo(p -> System.out.println("2")).elseDo(System.out::print); Condition.of... -
Python线程协作threading.Condition实现过程解析
2020-09-17 17:47:45主要介绍了Python线程协作threading.Condition实现过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 -
springboot通过@Condition注解类型完成加载配置内容
2020-06-16 15:02:33通过@Bean和@Condition 注解自定义对于的condition里面根据自定义的条件实现指定类注入到spring中;@ConditionalOnProperty可以根据配置文件中的 属性值不同将不同的类注入到spring中 该资源中案例完整,代码简单移动 -
A Full-Condition Monitoring Method for Nonstationary Dynamic Chemical Processes with Cointegration ...
2021-02-07 06:30:35A Full-Condition Monitoring Method for Nonstationary Dynamic Chemical Processes with Cointegration and Slow Feature Analysis -
MyBatisPlus条件构造器Condition的用法示例代码
2019-04-24 21:39:21MyBatisPlus条件构造器Condition的用法示例代码 -
python多线程高级锁condition简单用法示例
2021-01-02 15:02:58本文实例讲述了python多线程高级锁condition简单用法。分享给大家供大家参考,具体如下: 多线程编程中如果使用Condition对象代替lock, 能够实现在某个事件触发后才处理数据, condition中含有的方法: – wait:线程... -
Python多线程编程(七):使用Condition实现复杂同步
2020-12-23 22:13:21Python提供的Condition对象提供了对复杂线程同步问题的支持。Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法。 使用Condition的主要方式为:线程首先acquire一个... -
Spring实战之缓存使用condition操作示例
2020-08-25 03:53:50主要介绍了Spring实战之缓存使用condition操作,结合实例形式分析了Spring缓存使用condition具体配置、属性、领域模型等相关操作技巧与注意事项,需要的朋友可以参考下 -
Java编程中实现Condition控制线程通信
2020-08-28 21:06:54主要介绍了Java编程中实现Condition控制线程通信,简单介绍了Java中控制线程通信的方法,以及对condition的解析和实例,具有一定参考价值,需要的朋友可以了解下。 -
condition_variable源码以及详细分析.docx
2020-05-22 13:55:47详细介绍了线程同步条件变量condition_variable的使用和它的源码,涉及到unique_lock, mutex, lock_guard, 虚假唤醒和惊群效应。 -
Python线程条件变量Condition原理解析
2020-09-18 01:47:31主要介绍了Python线程条件变量Condition原理解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 -
Java多线程中ReentrantLock与Condition详解
2020-08-28 18:50:14主要介绍了Java多线程中ReentrantLock与Condition详解,需要的朋友可以参考下 -
Java并发编程之Condition源码分析(推荐)
2020-08-26 04:41:36主要介绍了Java并发编程之Condition源码分析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧 -
Condition Monitoring and Faults Diagnosis of Induction Motors
2022-01-27 11:49:39Condition Monitoring and Faults Diagnosis of Induction Motors Electrical Signature Analysis -
基于SpringBoot核心原理(自动配置、事件驱动、Condition)
2020-09-07 18:22:11主要介绍了基于SpringBoot核心原理(自动配置、事件驱动、Condition),具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧 -
Java多线程编程中使用Condition类操作锁的方法详解
2020-09-02 01:08:15Condition是java.util.concurrent.locks包下的类,提供了对线程锁的更精细的控制方法,下面我们就来看一下Java多线程编程中使用Condition类操作锁的方法详解 -
类似Object监视器方法的Condition接口(详解)
2020-08-30 09:58:40下面小编就为大家带来一篇类似Object监视器方法的Condition接口(详解)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧 -
Java使用Condition控制线程通信的方法实例详解
2020-08-25 15:09:45主要介绍了Java使用Condition控制线程通信的方法,结合实例形式分析了使用Condition类同步检测控制线程通信的相关操作技巧,需要的朋友可以参考下 -
Condition详解
2021-04-03 01:49:05Condition Condition 是一个多线程协调通信的工具类,可以让某些线程一起等待某个条件(condition),只有满足条件时,线程才会被唤醒 condition的使用 //生产者消费者模型代码 @Override public void run() { ...Condition
Condition 是一个多线程协调通信的工具类,可以让某些线程一起等待某个条件(condition),只有满足条件时,线程才会被唤醒
condition的使用
//生产者消费者模型代码 @Override public void run() { while (true) { lock.lock(); try { while (goodsList.size() == maxCount) { System.out.println("生产者先等等。。。"); condition.await(); } Thread.sleep(30); goodsList.add(++a); System.out.println("生产者生产商品:" + a); condition.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } } @Override public void run() { while (true) { lock.lock(); try { while (goodsList.isEmpty()) { System.out.println("消费者先等等。。。"); condition.await(); } Thread.sleep(30); System.out.println("消费者消费商品:" + goodsList.remove()); condition.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }
condition 中两个最重要的方法,一个是 await,一个是 signal 方法
-
await:把当前线程阻塞挂起
-
signal:唤醒阻塞的线程
源码分析
调用 Condition,需要获得 Lock 锁,所以意味着会存在一个 AQS 同步队列,在上面那个案例中,假如两个线程同时运行的话,那么 AQS 的队列可能是下面这种情况,一个线程获得锁,另一个线程进入到同步队列
condition.wait()
//AQS队列中 AbstractQueuedSynchronizer public final void await() throws InterruptedException { if (Thread.interrupted())//如果被中断,则抛出异常 throw new InterruptedException(); //创建一个新的节点,节点的状态是condition,数据结构为链表 Node node = addConditionWaiter(); //释放当前的锁,得到锁的状态,并唤醒AQS队列中的一个线程 //并且缓存当前线程的state 后续唤醒时继续设置,考虑重入锁 int savedState = fullyRelease(node); int interruptMode = 0; //判断节点是否在同步队列 如果不在同步队列 则阻塞线程 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //当线程被唤醒后 继续执行以下逻辑 唤醒线程 //当这个线程醒来,会尝试拿锁,当acquireQueued返回false就是拿到锁了. //interruptMode != THROW_IE -> 表示这个线程被中断,但signal执行了enq方法让其入队了. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)//-1 // 将这个变量设置成 REINTERRUPT. interruptMode = REINTERRUPT;//1 //如果 node 的下一个等待者不是 null, 则进行清理,清理 Condition 队列上的节点 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) // 如果线程被中断了,需要根据transferAfterCancelledWait的返回结果判断怎么处理 reportInterruptAfterWait(interruptMode); }
addConditionWaiter()
//创建一个新的节点,节点的状态是condition,数据结构为单向链表 private Node addConditionWaiter() { Node t = lastWaiter; //如果lastWaiter节点为cancel状态,则清理掉 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } //第一次进来的时候 lastwaiter为null //创建一个新的节点,节点的状态是condition,数据结构为链表 传ThreadB Node node = new Node(Thread.currentThread(), Node.CONDITION); //lastWaiter 设置firstWaiter为node if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
unlinkCancelledWaiters()
//从头开始遍历,将cancel状态的节点清理 private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } }
fullyRelease(Node node)
//释放资源 final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); //释放方法同lock.unlock()一样 //不同的是,这里不管重入了几次都是一次释放 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
isOnSyncQueue(Node node)
//判断调用condition.await的线程是否在同步队列 final boolean isOnSyncQueue(Node node) { //如果waitStatus为CONDITION 或者prev为null一定在同步队列 因为condition等待队列为单项列表 且没有next节点 为nextwait节点 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; if (node.next != null) // If has successor, it must be on queue return true; //循环遍历 return findNodeFromTail(node); }
findNodeFromTail(Node node)
//从后往前遍历,是因为添加到AQS队列是先设置prev连接 private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) //遍历到,直接返回 return true; if (t == null) //遍历完之后 没有则返回false return false; t = t.prev; } }
checkInterruptWhileWaiting(Node node)
//如果线程被中断,则执行transferAfterCancelledWait 否则返回0 //线程被中断后,通过transferAfterCancelledWait private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; }
如果当前线程被中断,则调用transferAfterCancelledWait方法判断后续的处理应该是抛出InterruptedException还是重新中断。
transferAfterCancelledWait(Node node)
final boolean transferAfterCancelledWait(Node node) { //cas设置节点awaitState为0,如果设置失败则表示线程cancel //使用 cas 修改节点状态,如果还能修改成功,说明线程被唤醒时,signal还没有被调用。 //这里有一个知识点,就是线程被唤醒,并不一定是在 java 层面执行了locksupport.unpark,也可能是调用了线程的 interrupt()方法,这个方法会更新一个中断标识,并且会唤醒处于阻塞状态下的线程。 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { 如果 cas 成功,则把node 添加到 AQS 队列 enq(node); return true; } //如果cas失败,则判断当前node是否已经在AQS队列上,如果不在,则让给其他线程执行 //当node被触发了signal方法时,node就会被加到aqs队列上 while (!isOnSyncQueue(node)) Thread.yield(); return false; }
该方法的返回值代表当前线程是否在park的时候被中断唤醒,如果为 true 表示中断在signal调用之前,signal还未执行,那么这个时候会根据await的语 义,在await时遇到中断需要抛出interruptedException,返回true就是告诉 checkInterruptWhileWaiting返回THROW_IE(-1)。 如果返回 false,否则表示signal已经执行过了,只需要 重新响应中断即可
condition.signal()
public final void signal() { //判断当前共享资源中的Thread是否和当前线程一致,否则抛出异常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //拿到等待队列的头节点 Node first = firstWaiter; if (first != null) doSignal(first); } private void doSignal(Node first) { do { //将下一个节点设置为头结点,并且将first.nextWaiter置空 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { //设置ThreadA节点的waitStatus=0 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; //将ThreadA加入到AQS Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) //唤醒ThreadA LockSupport.unpark(node.thread); return true; }
总结
await 和 signal 的总结
我把前面的整个分解的图再通过一张整体的结构图来表述,线程awaitThread先通过lock.lock()方法获取锁成功后调用了condition.await方法进入等待队列,而另一个线程signalThread通过lock.lock()方法获取锁成功后调用了 condition.signal或者signalAll方法,使得线程awaitThread能够有机会移入到同步队列中,当其他线程释放lock后使得线程awaitThread能够有机会获取lock,从而使得线程awaitThread能够从 await方法中退出执行后续操作。如果awaitThread获取lock失败会直接进入到同步队列。
阻塞:await()方法中,在线程释放锁资源之后,如果节点不在 AQS 等待队列,则阻塞当前线程,如果在等待队列,则自旋等待尝试获取锁
释放:signal()后,节点会从 condition 队列移动到 AQS等待队列,则进入正常锁获取流程
流程图
-
-
前端项目-condition.zip
2019-09-03 13:06:01前端项目-condition,Advanced condition library -
neumann-boundary-condition:自然共形映射
2021-02-13 15:24:45neumann-boundary-condition:自然共形映射 -
An approach to generating test data for EFSM paths considering condition coverage
2021-02-09 07:03:05An approach to generating test data for EFSM paths considering condition coverage -
Concurrent monitoringof operating condition deviations and process dynamics anomalies with ...
2021-02-06 21:31:33Concurrent monitoringof operating condition deviations and process dynamics anomalies with slowfeature analysis -
New High-Precision Strapdown Navigation Attitude Algorithm under Angular-Rate Input Condition
2021-02-06 19:11:17New High-Precision Strapdown Navigation Attitude Algorithm under Angular-Rate Input Condition -
生产者-消费者(lock和condition).zip
2019-08-07 11:00:15通过java语言编写的生产者消费者,实现方法为lock类和condition类的配合完成。