-
Java多线程之间如何进行通信?
2021-01-02 10:57:59面试题:新建 T1、T2、T3 三个线程,如何保证它们按顺序执行? 代码如下: package com.autocoding.juc; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; /** * * 面试题:新建 ...目录
1、Object.wait()与Object.notify()/notifyAll()
3、Condition.await() Condition.signal()/signalAll()
4、CountDownLatch/CyclicBarrier
本文关注Java多线程之间,如何进行通信?当一个复杂任务由多个线程共同完成时,线程与线程之间,如何进行协作,本文主要以代码的形式演示Java中常用的工具,以实现多线程之间通信。
而多线程实现对互斥资源的访问,请参考我的另一篇文章:Java锁汇总1、Object.wait()与Object.notify()/notifyAll()
参考文档:如何在 Java 中正确使用 wait, notify 和 notifyAll – 以生产者消费者模型为例
面试题:利用Obejct.wait()与Object.notify()/notifyAll()来实现生产者消费者模型
package com.autocoding.juc; import java.util.LinkedList; import java.util.Queue; import java.util.Random; /** * Simple Java program to demonstrate How to use wait, notify and notifyAll() * method in Java by solving producer consumer problem. * * @author Javin Paul */ public class WaitNotifyTest { public static void main(String args[]) { System.out.println("How to use wait and notify method in Java"); System.out.println("Solving Producer Consumper Problem"); final Queue<Integer> buffer = new LinkedList<>(); final int maxSize = 10; final Thread producer = new Producer(buffer, maxSize, "PRODUCER"); final Thread consumer = new Consumer(buffer, maxSize, "CONSUMER"); producer.start(); consumer.start(); } } /** * Producer Thread will keep producing values for Consumer to consumer. It will * use wait() method when Queue is full and use notify() method to send * notification to Consumer Thread. * * @author WINDOWS 8 * */ class Producer extends Thread { private final Queue<Integer> queue; private final int maxSize; public Producer(Queue<Integer> queue, int maxSize, String name) { super(name); this.queue = queue; this.maxSize = maxSize; } @Override public void run() { while (true) { synchronized (queue) { while (queue.size() == maxSize) { try { System.out.println("Queue is full, " + "Producer thread waiting for " + "consumer to take something from queue"); queue.wait(); } catch (final Exception ex) { ex.printStackTrace(); } } final Random random = new Random(); final int i = random.nextInt(); System.out.println("Producing value : " + i); queue.add(i); queue.notifyAll(); } } } } /** * Consumer Thread will consumer values form shared queue. It will also use * wait() method to wait if queue is empty. It will also use notify method to * send notification to producer thread after consuming values from queue. * * @author WINDOWS 8 * */ class Consumer extends Thread { final private Queue<Integer> queue; final private int maxSize; public Consumer(Queue<Integer> queue, int maxSize, String name) { super(name); this.queue = queue; this.maxSize = maxSize; } @Override public void run() { while (true) { synchronized (queue) { while (queue.isEmpty()) { System.out.println("Queue is empty," + "Consumer thread is waiting" + " for producer thread to put something in queue"); try { queue.wait(); } catch (final Exception ex) { ex.printStackTrace(); } } System.out.println("Consuming value : " + queue.remove()); queue.notifyAll(); } } } }
2、Thread.join()
面试题:新建 T1、T2、T3 三个线程,如何保证它们按顺序执行?
代码如下:package com.autocoding.juc; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; /** * * 面试题:新建 T1、T2、T3 三个线程,如何保证它们按顺序执行? 期望执行顺序:T1、T2、T3 */ @Slf4j public class Thread$JoinTest { private static T1 t1 = new T1("T1"); private static T2 t2 = new T2("T2"); private static T3 t3 = new T3("T3"); public static void main(String[] args) { Thread$JoinTest.t3.start(); Thread$JoinTest.t2.start(); Thread$JoinTest.t1.start(); } private static class T1 extends Thread { public T1(String name) { super(name); } @Override public void run() { Thread$JoinTest.log.info("当前线程T1:开始运行"); try { TimeUnit.SECONDS.sleep(5); } catch (final InterruptedException e) { e.printStackTrace(); } Thread$JoinTest.log.info("当前线程T1:结束运行"); } } private static class T2 extends Thread { public T2(String name) { super(name); } @Override public void run() { try { Thread$JoinTest.t1.join(); } catch (final InterruptedException e) { e.printStackTrace(); } Thread$JoinTest.log.info("当前线程T2:开始运行"); try { TimeUnit.SECONDS.sleep(5); } catch (final InterruptedException e) { e.printStackTrace(); } Thread$JoinTest.log.info("当前线程T2:结束运行"); } } private static class T3 extends Thread { public T3(String name) { super(name); } @Override public void run() { try { Thread$JoinTest.t2.join(); } catch (final InterruptedException e) { e.printStackTrace(); } Thread$JoinTest.log.info("当前线程T3:开始运行"); try { TimeUnit.SECONDS.sleep(5); } catch (final InterruptedException e) { e.printStackTrace(); } Thread$JoinTest.log.info("当前线程T3:结束运行"); } } }
运行结果:
3、Condition.await() Condition.signal()/signalAll()
面试题:利用 Condition.await() Condition.signal()/signalAll() 来实现生产者消费者模型
package com.autocoding.juc.threadmessaging; import java.util.LinkedList; import java.util.Queue; import java.util.Random; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class ConditionTest { private static final ReentrantLock BUFFER_LOCK = new ReentrantLock(); private static final Condition BUFFER_CONDITION = ConditionTest.BUFFER_LOCK.newCondition(); public static void main(String args[]) { final Queue<Integer> buffer = new LinkedList<>(); final int maxSize = 10; final Thread producer = new Producer(buffer, maxSize, "PRODUCER"); final Thread consumer = new Consumer(buffer, maxSize, "CONSUMER"); producer.start(); consumer.start(); } private static class Producer extends Thread { private final Queue<Integer> queue; private final int maxSize; public Producer(Queue<Integer> queue, int maxSize, String name) { super(name); this.queue = queue; this.maxSize = maxSize; } @Override public void run() { while (true) { ConditionTest.BUFFER_LOCK.lock(); try { while (queue.size() == maxSize) { try { System.out.println("Queue is full, " + "Producer thread waiting for " + "consumer to take something from queue"); ConditionTest.BUFFER_CONDITION.await(); } catch (final Exception ex) { ex.printStackTrace(); } } final Random random = new Random(); final int i = random.nextInt(); System.out.println("Producing value : " + i); queue.add(i); ConditionTest.BUFFER_CONDITION.signalAll(); } finally { ConditionTest.BUFFER_LOCK.unlock(); } } } } private static class Consumer extends Thread { final private Queue<Integer> queue; final private int maxSize; public Consumer(Queue<Integer> queue, int maxSize, String name) { super(name); this.queue = queue; this.maxSize = maxSize; } @Override public void run() { while (true) { ConditionTest.BUFFER_LOCK.lock(); try { while (queue.isEmpty()) { try { System.out.println("Queue is empty," + "Consumer thread is waiting" + " for producer thread to put something in queue"); ConditionTest.BUFFER_CONDITION.await(); } catch (final Exception ex) { ex.printStackTrace(); } } System.out.println("Consuming value : " + queue.remove()); ConditionTest.BUFFER_CONDITION.signalAll(); } finally { ConditionTest.BUFFER_LOCK.unlock(); } } } } }
程序运行结果:
4、CountDownLatch/CyclicBarrier
4.1 CountDownLatch
场景:百米赛跑,4名运动员选手到达场地等待裁判口令,裁判一声口令,选手听到后同时起跑,当所有选手到达终点,裁判进行汇总排名。
package com.autocoding.juc.threadmessaging; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 场景:百米赛跑,4名运动员选手到达场地等待裁判口令,裁判一声口令,选手听到后同时起跑,当所有选手到达终点,裁判进行汇总排名。 * * @author Administrator * */ public class CountdownLatchTest { public static void main(String[] args) { final ExecutorService service = Executors.newCachedThreadPool(); final CountDownLatch beginningCDL = new CountDownLatch(1); final CountDownLatch endingCDL = new CountDownLatch(4); for (int i = 0; i < 4; i++) { final Runnable runnable = new Runnable() { @Override public void run() { try { System.out.println("选手" + Thread.currentThread().getName() + "正在等待裁判发布口令"); beginningCDL.await(); System.out.println("选手" + Thread.currentThread().getName() + "已接受裁判口令"); Thread.sleep((long) (Math.random() * 10000)); System.out.println("选手" + Thread.currentThread().getName() + "到达终点"); endingCDL.countDown(); } catch (final InterruptedException e) { e.printStackTrace(); } } }; service.execute(runnable); } try { Thread.sleep((long) (Math.random() * 10000)); System.out.println("裁判" + Thread.currentThread().getName() + "即将发布口令"); beginningCDL.countDown(); System.out.println("裁判" + Thread.currentThread().getName() + "已发送口令,正在等待所有选手到达终点"); endingCDL.await(); System.out.println("所有选手都到达终点"); System.out.println("裁判" + Thread.currentThread().getName() + "汇总成绩排名"); } catch (final InterruptedException e) { e.printStackTrace(); } service.shutdown(); } }
程序运行结果:
4.2 CyclicBarrier
package com.autocoding.juc.threadmessaging; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 场景:百米赛跑,4名运动员选手到达场地等待裁判口令,裁判一声口令,选手听到后同时起跑,当所有选手到达终点,裁判进行汇总排名。 * * @author Administrator * */ public class CyclicBarrierTest { public static void main(String[] args) { final ExecutorService service = Executors.newCachedThreadPool(); final CyclicBarrier beginningCyclicBarrier = new CyclicBarrier(4, new Runnable() { @Override public void run() { System.out.println("所有选手,都准备好了,比赛可以开始了"); } }); final CountDownLatch endingCDL = new CountDownLatch(4); for (int i = 0; i < 4; i++) { final Runnable runnable = new Runnable() { @Override public void run() { try { System.out.println("选手" + Thread.currentThread().getName() + "准备好了...."); try { beginningCyclicBarrier.await(); } catch (final BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("选手" + Thread.currentThread().getName() + "已接受裁判口令"); Thread.sleep((long) (Math.random() * 10000)); System.out.println("选手" + Thread.currentThread().getName() + "到达终点"); endingCDL.countDown(); } catch (final InterruptedException e) { e.printStackTrace(); } } }; service.execute(runnable); } try { Thread.sleep((long) (Math.random() * 10000)); endingCDL.await(); System.out.println("所有选手都到达终点"); System.out.println("裁判" + Thread.currentThread().getName() + "汇总成绩排名"); } catch (final InterruptedException e) { e.printStackTrace(); } service.shutdown(); } }
程序运行结果:
5、BlockingQueue
参考文档:使用blockingqueue实现的简单生产者消费者模型
6、Semaphore
利用Semaphore将Set装饰为一个BlockingSet,代码示例,初始化一个容量为5的Set,当向BlockingSet中继续添加元素时,阻塞,当元素删除元素时,又可以继续添加元素了,直到当前元素容量为5 。
请记住Guava中的RateLimiter也是基于Semaphore来实现的。package com.autocoding.juc.threadmessaging; import java.util.Collection; import java.util.HashSet; import java.util.Iterator; import java.util.Set; import java.util.concurrent.Semaphore; /** * 利用JDK中信号量来将SET修饰为一个BlockingSet * @ClassName: SemaphoreTest * @author: QiaoLi * @date: Jan 6, 2021 11:10:14 AM */ public class SemaphoreTest { public static void main(String[] args) throws InterruptedException { BlockingSet<Integer> blockingSet = new BlockingSet<>(new HashSet<Integer>(), 5); for (int i = 1; i <= 10; i++) { blockingSet.add(i); } System.err.println("blockingSet:" + blockingSet); } private static class BlockingSet<T> implements Set<T> { private Set<T> set = new HashSet<T>(); private final Semaphore semaphore; public BlockingSet(Set<? extends T> set, Integer capacity) { if (null == set) { throw new IllegalArgumentException("set is null"); } if (null == capacity) { throw new IllegalArgumentException("capacity is null"); } this.set.addAll(set); this.semaphore = new Semaphore(capacity); } @Override public boolean add(Object e) { System.out.println("开始存储元素:" + e); try { semaphore.acquire(); } catch (InterruptedException e1) { System.err.println(e1.getMessage()); } boolean addFlag = false; try { addFlag = set.add((T) e); return addFlag; } finally { if (!addFlag) { semaphore.release(); } System.err.println("结束存储元素:" + e); } } @Override public boolean remove(Object o) { System.out.println("开始删除元素:" + o); boolean removedFlag = set.remove(o); if (removedFlag) { semaphore.release(); } System.out.println("结束删除元素:" + o); return removedFlag; } @Override public boolean addAll(Collection c) { boolean modified = false; for (Object e : c) { if (add(e)) { modified = true; } } return modified; } ........... 删除无关代码 } }
程序运行结果:
7、AbstractQueuedSynchronizer
利用AbstractQueuedSynchronizer来实现一个阻塞式的锁,源码如下:
package com.autocoding.lock; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; class MyLock implements Lock { private final Sync sync = new Sync(); @Override public void lock() { this.sync.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { this.sync.acquireInterruptibly(1); } @Override public boolean tryLock() { return this.sync.tryAcquire(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return this.sync.tryAcquireNanos(1, TimeUnit.NANOSECONDS.convert(time, unit)); } @Override public void unlock() { this.sync.release(1); } @Override public Condition newCondition() { return this.sync.newCondition(); } private static class Sync extends AbstractQueuedSynchronizer { // 等待状态 private static int wating = 1; // 结束等待状态 private static int done = 2; public Sync() { this.setState(Sync.done); } @Override protected boolean tryAcquire(int permit) { if (this.getState() == Sync.done) { if (this.compareAndSetState(Sync.done, Sync.wating)) { return true; } } return false; } @Override protected boolean tryRelease(int permits) { if (this.getState() == Sync.wating) { if (this.compareAndSetState(Sync.wating, Sync.done)) { return true; } } return false; } final ConditionObject newCondition() { return new ConditionObject(); } } }
测试代码如下:
package com.autocoding.lock; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import lombok.extern.slf4j.Slf4j; @Slf4j public class MyLockTest { private static Lock lock = new MyLock(); public static void main(String[] args) { final List<Thread> threadList = new ArrayList<>(); // step 1:创建10个线程 for (int i = 1; i <= 5; i++) { threadList.add(new Thread(new Runnable() { @Override public void run() { try { MyLockTest.lock.lock(); MyLockTest.log.info("当前线程【{}】获取到锁,开始执行", Thread.currentThread().getName()); TimeUnit.SECONDS.sleep(5); } catch (final Exception e) { System.err.println(e.getMessage()); } finally { MyLockTest.log.info("当前线程【{}】释放锁,结束执行", Thread.currentThread().getName()); MyLockTest.lock.unlock(); } } }, "thead-" + i)); } // step 2:启动10个线程 for (final Thread thread : threadList) { thread.start(); } // step 3:主线程等待这10个线程执行完成 for (final Thread thread : threadList) { try { thread.join(); } catch (final InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
程序运行结果:
2021-01-30 11:21:10 [INFO] com.autocoding.lock.MyLockTest$1.run(MyLockTest.java:26) thead-1 com.autocoding.lock.MyLockTest 当前线程【thead-1】获取到锁,开始执行 2021-01-30 11:21:15 [INFO] com.autocoding.lock.MyLockTest$1.run(MyLockTest.java:31) thead-1 com.autocoding.lock.MyLockTest 当前线程【thead-1】释放锁,结束执行 2021-01-30 11:21:15 [INFO] com.autocoding.lock.MyLockTest$1.run(MyLockTest.java:26) thead-2 com.autocoding.lock.MyLockTest 当前线程【thead-2】获取到锁,开始执行 2021-01-30 11:21:20 [INFO] com.autocoding.lock.MyLockTest$1.run(MyLockTest.java:31) thead-2 com.autocoding.lock.MyLockTest 当前线程【thead-2】释放锁,结束执行 2021-01-30 11:21:20 [INFO] com.autocoding.lock.MyLockTest$1.run(MyLockTest.java:26) thead-3 com.autocoding.lock.MyLockTest 当前线程【thead-3】获取到锁,开始执行 2021-01-30 11:21:25 [INFO] com.autocoding.lock.MyLockTest$1.run(MyLockTest.java:31) thead-3 com.autocoding.lock.MyLockTest 当前线程【thead-3】释放锁,结束执行 2021-01-30 11:21:25 [INFO] com.autocoding.lock.MyLockTest$1.run(MyLockTest.java:26) thead-4 com.autocoding.lock.MyLockTest 当前线程【thead-4】获取到锁,开始执行 2021-01-30 11:21:30 [INFO] com.autocoding.lock.MyLockTest$1.run(MyLockTest.java:31) thead-4 com.autocoding.lock.MyLockTest 当前线程【thead-4】释放锁,结束执行 2021-01-30 11:21:30 [INFO] com.autocoding.lock.MyLockTest$1.run(MyLockTest.java:26) thead-5 com.autocoding.lock.MyLockTest 当前线程【thead-5】获取到锁,开始执行 2021-01-30 11:21:35 [INFO] com.autocoding.lock.MyLockTest$1.run(MyLockTest.java:31) thead-5 com.autocoding.lock.MyLockTest 当前线程【thead-5】释放锁,结束执行
-
Python多线程之间如何进行通信 threading
2019-12-31 13:44:34需要一个爬虫,爬虫线程从互联网爬取数据,将数据爬取下来之后,在由另外一个线程将爬取的数据写入文件或数据库,两个线程同时开多个拷贝。 2 容易犯的错误 一种错误写法: for i in range(ts): threads2.append...1 需求
需要一个爬虫,爬虫线程从互联网爬取数据,将数据爬取下来之后,在由另外一个线程将爬取的数据写入文件或数据库,两个线程同时开多个拷贝。
2 容易犯的错误
一种错误写法:
for i in range(ts): threads2.append(MakeData(qu=qu,fund_data_qu=fund_data_qu)) threads.append(ThreadToFile(fund_data_qu=fund_data_qu)) for i in range(ts): threads2[i].start() threads[i].start() for i in range(ts): threads[i].join() threads[i].join()
正确的写法应该是将其分开写:
for i in range(ts): threads2.append(MakeData(qu=qu,fund_data_qu=fund_data_qu)) for i in range(ts): threads2[i].start() for i in range(ts): threads2[i].join() for i in range(ts): threads.append(ThreadToFile(fund_data_qu=fund_data_qu)) for i in range(ts): threads[i].start() for i in range(ts): threads[i].join()
-
多个线程之间如何进行通信
2017-02-05 13:22:40在前一小节,介绍了在多线程编程中使用同步机制的重要性,并学会了如何实现同步的方法来正确地访问共享资源。这些线程之间的关系是平等的,彼此之间并不存在...那么,多个线程之间是如何进行通信的呢? 在现实在前一小节,介绍了在多线程编程中使用同步机制的重要性,并学会了如何实现同步的方法来正确地访问共享资源。这些线程之间的关系是平等的,彼此之间并不存在任何依赖,它们各自竞争CPU资源,互不相让,并且还无条件地阻止其他线程对共享资源的异步访问。然而,也有很多现实问题要求不仅要同步的访问同一共享资源,而且线程间还彼此牵制,通过相互通信来向前推进。那么,多个线程之间是如何进行通信的呢?
在现实应用中,很多时候都需要让多个线程按照一定的次序来访问共享资源,例如,经典的生产者和消费者问题。这类问题描述了这样一种情况,假设仓库中只能存放一件产品,生产者将生产出来的产品放入仓库,消费者将仓库中的产品取走消费。如果仓库中没有产品,则生产者可以将产品放入仓库,否则停止生产并等待,直到仓库中的产品被消费者取走为止。如果仓库中放有产品,则消费者可以将产品取走消费,否则停止消费并等待,直到仓库中再次放入产品为止。显然,这是一个同步问题,生产者和消费者共享同一资源,并且,生产者和消费者之间彼此依赖,互为条件向前推进。但是,该如何编写程序来解决这个问题呢?
传统的思路是利用循环检测的方式来实现,这种方式通过重复检查某一个特定条件是否成立来决定线程的推进顺序。比如,一旦生产者生产结束,它就继续利用循环检测来判断仓库中的产品是否被消费者消费,而消费者也是在消费结束后就会立即使用循环检测的方式来判断仓库中是否又放进产品。显然,这些操作是很耗费CPU资源的,不值得提倡。那么有没有更好的方法来解决这类问题呢?
首先,当线程在继续执行前需要等待一个条件方可继续执行时,仅有 synchronized 关键字是不够的。因为虽然synchronized关键字可以阻止并发更新同一个共享资源,实现了同步,但是它不能用来实现线程间的消息传递,也就是所谓的通信。而在处理此类问题的时候又必须遵循一种原则,即:对于生产者,在生产者没有生产之前,要通知消费者等待;在生产者生产之后,马上又通知消费者消费;对于消费者,在消费者消费之后,要通知生产者已经消费结束,需要继续生产新的产品以供消费。
其实,Java提供了3个非常重要的方法来巧妙地解决线程间的通信问题。这3个方法分别是:wait()、notify()和notifyAll()。它们都是Object类的最终方法,因此每一个类都默认拥有它们。
虽然所有的类都默认拥有这3个方法,但是只有在synchronized关键字作用的范围内,并且是同一个同步问题中搭配使用这3个方法时才有实际的意义。
这些方法在Object类中声明的语法格式如下所示:
final void wait() throws InterruptedException
final void notify()
final void notifyAll()其中,调用wait()方法可以使调用该方法的线程释放共享资源的锁,然后从运行态退出,进入等待队列,直到被再次唤醒。而调用notify()方法可以唤醒等待队列中第一个等待同一共享资源的线程,并使该线程退出等待队列,进入可运行态。调用notifyAll()方法可以使所有正在等待队列中等待同一共享资源的线程从等待状态退出,进入可运行状态,此时,优先级最高的那个线程最先执行。显然,利用这些方法就不必再循环检测共享资源的状态,而是在需要的时候直接唤醒等待队列中的线程就可以了。这样不但节省了宝贵的CPU资源,也提高了程序的效率。
由于wait()方法在声明的时候被声明为抛出InterruptedException异常,因此,在调用wait()方法时,需要将它放入try…catch代码块中。此外,使用该方法时还需要把它放到一个同步代码段中,否则会出现如下异常:
"java.lang.IllegalMonitorStateException: current thread not owner"
这些方法是不是就可以实现线程间的通信了呢?下面将通过多线程同步的模型: 生产者和消费者问题来说明怎样通过程序解决多线程间的通信问题。
具体步骤
下面这个程序演示了多个线程之间进行通信的具体实现过程。程序中用到了4个类,其中ShareData类用来定义共享数据和同步方法。在同步方法中调用了wait()方法和notify()方法,并通过一个信号量来实现线程间的消息传递。
// 例4.6.1 CommunicationDemo.java 描述:生产者和消费者之间的消息传递过程
class ShareData
{
private char c;
private boolean isProduced = false; // 信号量
public synchronized void putShareChar(char c) // 同步方法putShareChar()
{
if (isProduced) // 如果产品还未消费,则生产者等待
{
try
{
wait(); // 生产者等待
}catch(InterruptedException e){
e.printStackTrace();
}
}
this.c = c;
isProduced = true; // 标记已经生产
notify(); // 通知消费者已经生产,可以消费
}
public synchronized char getShareChar() // 同步方法getShareChar()
{
if (!isProduced) // 如果产品还未生产,则消费者等待
{
try
{
wait(); // 消费者等待
}catch(InterruptedException e){
e.printStackTrace();
}
}
isProduced = false; // 标记已经消费
notify(); // 通知需要生产
return this.c;
}
}
class Producer extends Thread // 生产者线程
{
private ShareData s;
Producer(ShareData s)
{
this.s = s;
}
public void run()
{
for (char ch = 'A'; ch <= 'D'; ch++)
{
try
{
Thread.sleep((int)(Math.random()*3000));
}catch(InterruptedException e){
e.printStackTrace();
}
s.putShareChar(ch); // 将产品放入仓库
System.out.println(ch + " is produced by Producer.");
}
}
}
class Consumer extends Thread // 消费者线程
{
private ShareData s;
Consumer(ShareData s)
{
this.s = s;
}
public void run()
{
char ch;
do{
try
{
Thread.sleep((int)(Math.random()*3000));
}catch(InterruptedException e){
e.printStackTrace();
}
ch = s.getShareChar(); // 从仓库中取出产品
System.out.println(ch + " is consumed by Consumer. ");
}while (ch != 'D');
}
}
class CommunicationDemo
{
public static void main(String[] args)
{
ShareData s = new ShareData();
new Consumer(s).start();
new Producer(s).start();
}
}上面的程序演示了生产者生产出A、B、C、D四个字符,消费者消费这四个字符的全过程,程序结果如图4.6.1所示:
图4.6.1 生产者和消费者举例 通过程序的运行结果可以看到,尽管在主方法中先启动了Consumer线程,但是,由于仓库中没有产品,因此,Consumer线程就会调用wait()方法进入等待队列进行等待,直到Producer线程将产品生产出来并放进仓库,然后使用notify()方法将其唤醒。
由于在两个线程中都指定了一定的休眠时间,因此也可能出现这样的情况:生产者将产品生产出来放入仓库,并通知等待队列中的Consumer线程,然而,由于休眠时间过长,Consumer线程还没有打算消费产品,此时,Producer线程欲生产下一个产品,结果由于仓库中的产品没有被消费掉,故Producer线程执行wait()方法进入等待队列等待,直到Consumer线程将仓库中的产品消费掉以后通过notify()方法去唤醒等待队列中的Producer线程为止。可见,两个线程之间除了必须保持同步之外,还要通过相互通信才能继续向前推进。
前面这个程序中,生产者一次只能生产一个产品,而消费者也只能一次消费一个产品。那么现实中也有这样的情况,生产者可以一次生产多个产品,只要仓库容量够大,就可以一直生产。而消费者也可以一次消费多个产品,直到仓库中没有产品为止。
但是,无论是生产产品到仓库,还是从仓库中消费,每一次都只能允许一个操作。显然,这也是个同步问题,只不过在这个问题中共享资源是一个资源池,可以存放多个资源。下面就以栈结构为例给出如何在这个问题中解决线程通信的程序代码。
// 例4.6.2 CommunicationDemo2.java
class SyncStack // 同步堆栈类,可以一次放入多个数据
{
private int index = 0; // 堆栈指针初始值为0
private char[] buffer = new char[5]; // 堆栈有5个字符的空间
public synchronized void push(char c) // 入栈同步方法
{
if(index == buffer.length) // 堆栈已满,不能入栈
{
try
{
this.wait(); //等待出栈线程将数据出栈
}catch(InterruptedException e){ }
}
buffer[index] = c; // 数据入栈
index++; // 指针加1,栈内空间减少
this.notify(); // 通知其他线程把数据出栈
}
public synchronized char pop() // 出栈同步方法
{
if(index == 0) // 堆栈无数据,不能出栈
{
try
{
this.wait(); //等待入栈线程把数据入栈
}catch(InterruptedException e){ }
}
this.notify(); //通知其他线程入栈
index--; //指针向下移动
return buffer[index]; //数据出栈
}
}
class Producer implements Runnable //生产者类
{
SyncStack s; //生产者类生成的字母都保存到同步堆栈中
public Producer(SyncStack s)
{
this.s = s;
}
public void run()
{
char ch;
for(int i=0; i<5; i++)
{
try
{
Thread.sleep((int)(Math.random()*1000));}catch(InterruptedException e){ }
ch =(char)(Math.random()*26+'A'); //随机产生5个字符
s.push(ch); //把字符入栈
System.out.println("Push "+ch+" in Stack"); // 打印字符入栈
}
}
}
class Consumer implements Runnable //消费者类
{
SyncStack s; //消费者类获得的字符都来自同步堆栈
public Consumer(SyncStack s)
{
this.s = s;
}
public void run()
{
char ch;
for(int i=0;i<5;i++)
{
try
{
Thread.sleep((int)(Math.random()*3000));
}catch(InterruptedException e){ }
ch = s.pop(); //从堆栈中读取字符
System.out.println("Pop "+ch+" from Stack"); //打印字符出栈
}
}
}
public class CommunicationDemo2
{
public static void main(String[] args)
{
SyncStack stack = new SyncStack();
//下面的消费者类对象和生产者类对象所操作的是同一个同步堆栈对象
Thread t1 = new Thread(new Producer(stack)); //线程实例化
Thread t2 = new Thread(new Consumer(stack)); //线程实例化
t2.start(); //线程启动
t1.start(); //线程启动
}
}程序中引入了一个堆栈数组buffer[]来模拟资源池,并使生产者类和消费者类都实现了Runnable接口,然后在主程序中通过前面介绍的方法创建两个共享同一堆栈资源的线程,并且有意先启动消费者线程,后启动生产者线程。请在阅读程序的时候仔细观察例4.6.1和本例的相似点以及区别之处,体会作者的用心。程序结果输出如图4.6.2所示:
图4.6.2 共享资源池的生产者和消费者问题 由于是栈结构,所以符合后进先出原则。有兴趣的读者还可以用符合先进先出原则的队列结构来模拟线程间通信的过程,相信可以通过查阅相关的资料来解决这个问题,在这里就不再给出程序代码了,作为一个思考题供读者练习。
专家说明
本小节介绍了三个重要的方法:wait()、notify()和notifyAll()。使用它们可以高效率地完成多个线程间的通信问题,这样在通信问题上就不必再使用循环检测的方法来等待某个条件的发生,因为这种方法是极为浪费CPU资源的,当然这种情况也不是所期望的。在例4.6.1中,为了更好地通信,引入了一个专门用来传递信息的信号量。利用信号量来决定线程是否等待无疑是一种非常安全的操作,值得提倡。此外,在例4.6.2中引入了资源池作为共享资源,并解决了在这种情况下如何实现多线程之间的通信问题。希望读者能够举一反三,编写出解决更加复杂问题的程序。
专家指点
可以肯定的是,合理地使用wait()、notify()和notifyAll()方法确实能够很好地解决线程间通信的问题。但是,也应该了解到这些方法是更复杂的锁定、排队和并发性代码的构件。尤其是使用 notify()来代替notifyAll()时是有风险的。除非确实知道每一个线程正在做什么,否则最好使用notifyAll()。其实,在JDK1.5中已经引入了一个新的包:java.util.concurrent 包,该包是一个被广泛使用的开放源码工具箱,里面都是有用的并发性实用程序。完全可以代替wait()和notify()方法用来编写自己的调度程序和锁。有关信息可以查阅相关资料,本书中不再赘述。
相关问题
Java提供了各种各样的输入输出流(stream),使程序员能够很方便地对数据进行操作。其中,管道(pipe)流是一种特殊的流,用于在不同线程间直接传送数据。一个线程发送数据到输出管道,另一个线程从输入管道中读出数据。通过使用管道,达到实现多个线程间通信的目的。那么,如何创建和使用管道呢?
Java提供了两个特殊的专门用来处理管道的类,它们就是PipedInputStream类和PipedOutputStream类。
其中,PipedInputStream代表了数据在管道中的输出端,也就是线程从管道读出数据的一端;PipedOutputStream代表了数据在管道中的输入端,也就是线程向管道写入数据的一端,这两个类一起使用就可以创建出数据输入输出的管道流对象。
一旦创建了管道之后,就可以利用多线程的通信机制对磁盘中的文件通过管道进行数据的读写,从而使多线程的程序设计在实际应用中发挥更大的作用
-
多线程之间如何实现通信?保证多线程线程安全进行
2020-07-24 23:53:04多线程安全通信问题 常用四种方式: - 休眠唤醒方式: - Object的 wait、notify、notifyAll - Condition的await、signal、signalAll - CountDownLatch:用于某个线程A等待若干个其他线程执行完之后,它才执行 - ...参考笔记:https://blog.csdn.net/qq_21556263/article/details/82759138
多线程通信问题
常用四种方式:
- 休眠唤醒方式: - Object的 wait、notify、notifyAll - Condition的await、signal、signalAll - CountDownLatch:用于某个线程A等待若干个其他线程执行完之后,它才执行 - CyclicBarrier:一组线程等待至某个状态之后再全部同时执行 - Semaphore:用于控制对某组资源的访问权限
案例:
1. 休眠唤醒方式
Object
的wait
、notify
、notifyAll
/** * 描述:<br> 线程间通信方式一 --- {@link Object} * 情况:两个线程打印数字 * 一个线程打印奇数、另一个打印偶数 * </> * @author 周志通 * @date 2020/7/19 23:08 **/ public class WaitNotifyRunnable{ private Object obj = new Object(); private Integer i=0; public void odd() { while(i<10){ synchronized (obj){ if(i%2 == 1){ System.out.println("奇数:"+i); i++; obj.notify(); } else { try { obj.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } public void even(){ while(i<10){ synchronized (obj){ if(i%2 == 0){ System.out.println("偶数:"+i); i++; obj.notify(); } else { try { obj.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } public static void main(String[] args){ final WaitNotifyRunnable runnable = new WaitNotifyRunnable(); Thread t1 = new Thread(new Runnable() { public void run() { runnable.odd(); } }, "偶数线程"); Thread t2 = new Thread(new Runnable() { public void run() { runnable.even(); } }, "奇数线程"); t1.start(); t2.start(); } }
Condition
的await
、signal
、signalAll
/** * 描述:<br> 线程间通信方式一 --- {@link Condition} * 情况:两个线程打印数字 * 一个线程打印奇数、另一个打印偶数 * </> * @author 周志通 * @date 2020/7/19 23:14 **/ public class WaitNotifyRunnable{ private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); private Integer i=0; public void odd() { while(i<10){ lock.lock(); try{ if(i%2 == 1){ System.out.println("奇数:"+i); i++; condition.signal(); } else { condition.await(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } public void even(){ while(i<10){ lock.lock(); try{ if(i%2 == 0){ System.out.println("偶数:"+i); i++; condition.signal(); } else { condition.await(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } public static void main(String[] args){ final WaitNotifyRunnable runnable = new WaitNotifyRunnable(); Thread t1 = new Thread(new Runnable() { public void run() { runnable.odd(); } }, "偶数线程"); Thread t2 = new Thread(new Runnable() { public void run() { runnable.even(); } }, "奇数线程"); t1.start(); t2.start(); } }
总结:
Object
和Condition
休眠唤醒区别
- object wait()必须在synchronized(同步锁)下使用, - object wait()必须要通过Nodify()方法进行唤醒 - condition await() 必须和Lock(互斥锁/共享锁)配合使用 - condition await() 必须通过 signal() 方法进行唤醒
2.
CountDownLatch
方式介绍:
-
CountDownLatch
是在java1.5
被引入的,存在于java
.util
.concurrent
包下。 -
CountDownLatch
这个类能够使一个线程等待其他线程完成各自的工作后再执行。 -
CountDownLatch
是通过一个计数器来实现的,计数器的初始值为线程的数量。
案例:
import java.util.concurrent.CountDownLatch; /** * 描述:<br> 线程间通信方式二 --- {@link CountDownLatch} * 情况:三个运动员 + 一个教练 * 只有三个运动员准备就绪,教练才开始让运动员 同时跑步 * </> * @author 周志通 * @date 2020/7/19 23:21 **/ public class CountDownThreadDemo01 { private CountDownLatch countDownLatch = new CountDownLatch(3) ; // 设置三个远动员 // 运动员 public void athlete(){ System.out.println(Thread.currentThread().getName()+" ---> ~~~1. 开始准备~~~~") ; try { Thread.sleep(1000) ; } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+" ---> ~~~2. 准备就绪~~~~") ; // 准备 countDownLatch.countDown() ; } public void coach(){ System.out.println(Thread.currentThread().getName()+" ---> ~~~运动员准备~~~~") ; // 等待 3个运动员 准备就绪 try { countDownLatch.await() ; } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+" ---> ~~~所有运动员开始训练~~~~") ; } public static void main(String[] args) { final CountDownThreadDemo01 count = new CountDownThreadDemo01() ; Thread t1 = new Thread(count::athlete,"运动员1") ; Thread t2 = new Thread(count::athlete,"运动员2") ; Thread t3 = new Thread(count::athlete,"运动员3") ; Thread t4 = new Thread(count::coach,"教练") ; t4.start() ; t1.start() ; t2.start() ; t3.start() ; } }
总结:
- 每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。
3.
CyclicBarrier
方式介绍:
- CyclicBarrier是在java1.5被引入的,存在于java.util.concurrent包下。 - CyclicBarrier实现让一组线程等待至某个状态之后再全部同时执行。 - CyclicBarrier底层是基于RentranLock和Condition实现。
案例:
/** * 描述:<br> 线程通信方式三 --- {@link CyclicBarrier} 方式 * 场景:三个运动员【三个线程】开始训练 * 要求:等带三个运动员同时准备就绪时,开始训练 * </> * @author 周志通 * @date 2020/7/20 8:26 **/ public class CyclicBarrierDemo01 { private CyclicBarrier cyclicBarrier = new CyclicBarrier(3) ; // 设置运动员的数量 public void startThread(){ String name = Thread.currentThread().getName() ; System.out.println(name + " --> 开始准备~~~" ) ; try { Thread.sleep(2*1000) ; cyclicBarrier.await() ; } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println(name + " --> 准备完成~~~开始训练" ) ; } public static void main(String[] args) { CyclicBarrierDemo01 cb = new CyclicBarrierDemo01() ; Thread t1 = new Thread(cb::startThread,"运动员1") ; Thread t2 = new Thread(cb::startThread,"运动员2") ; Thread t3 = new Thread(cb::startThread,"运动员3") ; t1.start(); t2.start(); t3.start(); } }
4. Semaphore方式
介绍:
- Semaphore是在java1.5被引入的,存在于java.util.concurrent包下。 - Semaphore用于控制对某组资源的访问权限。
案例:工人使用机器工作
/** * 描述:<br>线程间通信方式四 --- {@link Semaphore} * 场景:三台机器、多个工人使用机器 * 要求:一台机器不能同时被多个工人一起使用</> * @author 周志通 * @version 1.0.0 * @date 2020/7/20 9:15 **/ public class SemaphoreDemo { static class Machine implements Runnable{ private int num; private Semaphore semaphore; public Machine(int num, Semaphore semaphore) { this.num = num; this.semaphore = semaphore; } public void run() { try { semaphore.acquire();//请求机器 System.out.println("工人"+this.num+"请求机器,正在使用机器"); Thread.sleep(1000); System.out.println("工人"+this.num+"使用完毕,已经释放机器"); semaphore.release();//释放机器 } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args){ int worker = 8;//工人数 Semaphore semaphore = new Semaphore(3);//机器数 for (int i=0; i< worker; i++){ new Thread(new Machine(i, semaphore)).start(); } } }
-
多个线程之间是如何进行通信的呢?
2016-10-16 10:06:31在现实应用中,很多时候都需要让多个线程按照一定的次序来访问共享资源,例如,经典的生产者和消费者问题。这类问题描述了这样一种情况,假设仓库中只能存放一件产品,生产者将生产出来的产品放入仓库,消费者将仓库... -
Java线程之间如何通信
2018-04-01 08:59:38线程之间的通信给线程带来了巨大的价值 Volatile与Synchronized关键字 问题引出: 使用volatile修饰int型变量i,多个线程同时进行i++操作,这样可以实现线程安全吗? volatile修饰的变量具有可见性。可见性也... -
如何建立多线程之间的消息通信
2008-09-26 15:13:00开发服务器程序经常要涉及到多线程之间进行消息通信,比如子线程要通知父线程要退出,或一些其他的信息. 因为服务器的多线城都是工作线程,是没有CWnd,所以如果要接收消息,必须自己建立消息队列来获取消息,同时... -
Linux线程之间是如何通信的?
2020-11-02 01:42:14相对于进程之间的通信 线程之间的通信简单的多。 其通信方式有以下几种: 锁机制:包括互斥锁、条件变量、读写锁 互斥锁提供了以排他方式防止数据结构被并发修改的方法。 读写锁允许多个线程同时读共享数据,而对写... -
java多线程之间数据通信
2021-02-09 16:57:24我们知道Java每个线程之间是数据隔离的,那在多线程环境下,两个线程之间,如何进行数据传输呢 下面我们以main线程中新起一个子线程的方式,来模拟两个线程之间数据通信的场景。 使用共享变量/对象 private static ... -
多线程之间消息通信
2013-10-18 10:13:12演示了如何使用自定义消息进行线程间通信,如何在线程间传递消息,可运行,VC历程 -
Win32多线程之线程之间的通信
2013-11-16 00:47:47线程之间如何通信? 一个和race conditions很有关系的问题就是,线程通信问题。一个线程要有大用途,你i必须告诉它做什么事情。Win32提供了一个简单的方法,供应线程的启动(start-up)信息。但是两个线程间的... -
QT5自定义信号与槽进行多线程之间的通信
2019-07-24 00:29:13最近在使用QT的多线程的时候,遇到了一个问题:如何在子线程中操作UI主线程的控件呢?比如我子线程中接收到串口的数据变化后,需要更新界面的LcdNumber的数字,但子线程又不能直接操作UI控件。 为此我想了两个办法... -
多线程之间的通信(转)
2013-03-30 10:38:57问题 线程之间的关系是平等的,彼此之间并不存在任何...那么,多个线程之间是如何进行通信的呢? 解决思路 在现实应用中,很多时候都需要让多个线程按照一定的次序来访问共享资源,例如,经典的生产者和消费 -
java如何进行线程的通信_线程如何在Java中相互通信?
2021-03-14 22:46:13线程间通信使线程之间的通信不断发展。用于在Java中实现线程间通信的三种方法等待()此方法使当前线程释放锁。直到完成特定时间段或另一个线程为此对象调用notify()或notifyAll()方法,该操作才完成。通知()此方法从... -
java代码进程或线程通信的实现_如何在学习Java过程中实现线程之间的通信!
2021-02-28 19:30:21wait,notify 和 notifyAll,这些在多线程中被经常用到的保留关键字,在实际开发的时候很多时候却并没有被大家重视,而本文则是对这些关键字的使用进行描述。存在即合理在java中,每个对象都有两个池,锁池(monitor)...