精华内容
下载资源
问答
  • 主要介绍了Java 线程通信的的相关资料,文中讲解非常细致,代码帮助大家更好的理解和学习,感兴趣的朋友可以了解下
  • 主要介绍了Java等待唤醒机制线程通信原理解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
  • 主要介绍了Java线程通信不同步问题,结合实例形式分析了java线程通信不同步问题的原理并模拟实现了线程间通信不同步情况下的异常输出,需要的朋友可以参考下
  • Java 常见线程通信生产消费案例演示

    千次阅读 2020-03-18 13:02:51
    介绍了Java线程通信原理、目的、方法,在文章最后给出了多个线程通信案例。

    介绍了Java线程通信的原理、目的、方法,在文章最后给出了多个线程通信案例。

    1 为什么要线程通信?

    多个线程并发执行时,在默认情况下CPU是随机切换线程的,当我们需要多个线程来共同完成一件任务,并且我们希望他们有规律的执行,那么多线程之间需要一些协调通信,以此来帮我们达到多线程共同操作一份数据。

    狭义上来说:线程通信的目标是使线程间能够互相发送信号(通知),另一方面,线程通信使线程能够等待其他线程的信号(通知),也称为线程间的等待/通知机制,或者生产消费模式!

    广义上说:能够协调线程调度运行的方法都属于线程通信的应用,不应是一个线程主动通知另外一个线程,这个通知还可能是一个公共信号。

    2 线程通信的方式

    1. synchronized 线程通信
      1. synchronized + wait + notify + notifyAll
    2. 使用lock+Condition控制线程通信
      1. JDK1.5开始,Lock可以代替synchronized 同步方法或同步代码块,Condition替代同步监视器的功能:lock + condition + await + signal + signalAll
    3. 使用阻塞队列(BlockingQueue)控制线程通信
      1. BlockingQueue接口主要作为线程同步的工具。当生产者试图向BlockingQueue中放入元素,如果队列已满,则线程被阻塞;当消费者试图向BlockingQueue中取出元素时,若该队列已空,则线程被阻塞。这里通过共享一个队列的信息,实现生产者和消费者。
    4. 使用管道流PipedWriter/PipedReader
    5. 使用JDK1.5提供的信号量Semaphore、CountDownLatch、CyclicBarrier等工具类
    6. volatile
      1. volatile能保证所修饰的变量对于多个线程可见性,即只要被修改,其它线程读到的一定是最新的值。以此来实现线程通信,但是volatile不能保证操作原子性,是一种弱的同步机制。

    3 线程通信的案例

    3.1 synchronized实现生产消费

    下面的案例使用synchronized实现了多生产者多消费者的案例

    public class ProductionAndConsumption {
        public static void main(String[] args) {
            Resource resource = new Resource();
            Thread thread1 = new Thread(new Producer(resource), "生产者1");
            Thread thread2 = new Thread(new Producer(resource), "生产者2");
            Thread thread3 = new Thread(new Consumer(resource), "消费者1");
            Thread thread4 = new Thread(new Consumer(resource), "消费者2");
            thread1.start();
            thread2.start();
            thread3.start();
            thread4.start();
    
        }
    }
    
    /**
     * 表示产品资源
     */
    class Resource {
        //标号
        private int count;
        //名字
        private String name;
    
        //标志位,false表示没有产品,trur表示生产出了产品
        private boolean flag;
    
        /**
         * 生产产品
         * @param name
         */
        void set(String name) {
            //使用同步块
            synchronized (this) {
                //判断false是否为true,如果是true说明有产品了,那么生产者线程应该等待
                if (flag) {
                    try {
                        System.out.println("有产品了--" + Thread.currentThread().getName() + "生产等待");
                        this.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                //走到这一步,说明没有产品,可以生产
                this.name = name;
                System.out.println(Thread.currentThread().getName() + "--" + this.name + (++count) + "生产");
                //设置产品标志为true,表示有产品了,可以消费了
                flag = true;
                //这里唤醒所有线程,有可能还会唤醒生产者
                this.notifyAll();
            }
        }
    
        /**
         * 消费产品
         */
        void get() {
            synchronized (this) {
                //判断flag是否为false,如果是fasle说明有产品了,那么消费者线程应该等待
                if (!flag) {
                    try {
                        System.out.println("没产品了--" + Thread.currentThread().getName() + "消费等待");
                        this.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                //走到这一步,说明有产品,可以消费
                System.out.println(Thread.currentThread().getName() + "--" + this.name + count + "消费");
                //设置产品标志为false,表示没有产品了,可以生产了
                flag = false;
                //这里唤醒所有线程,有可能还会唤醒消费者
                this.notifyAll();
            }
        }
    }
    
    /**
     * 生产者线程
     */
    class Producer implements Runnable {
        private Resource resource;
    
        public Producer(Resource resource) {
            this.resource = resource;
        }
    
        @Override
        public void run() {
            while (true) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //调用生产方法
                resource.set("面包");
            }
        }
    }
    
    /**
     * 消费者线程
     */
    class Consumer implements Runnable {
        private Resource resource;
    
        public Consumer(Resource resource) {
            this.resource = resource;
        }
    
        @Override
        public void run() {
            while (true) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //调用消费方法
                resource.get();
            }
        }
    }
    
    

    3.2 Lock实现生产消费

    public class LockPC {
        public static void main(String[] args) {
            Resource resource = new Resource();
            Producer producer = new Producer(resource);
            Consumer consumer = new Consumer(resource);
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 4, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
            threadPoolExecutor.execute(producer);
            threadPoolExecutor.execute(consumer);
            threadPoolExecutor.execute(producer);
            threadPoolExecutor.execute(consumer);
            threadPoolExecutor.shutdown();
        }
    
        /**
         * 产品资源
         */
        static class Resource {
            private String name;
            private int count;
            boolean flag;
            //获取lock锁,lock锁的获取和释放需要代码手动操作
            ReentrantLock lock = new ReentrantLock();
            //从lock锁获取一个condition,用于生产者线程在此等待和唤醒
            Condition producer = lock.newCondition();
            //从lock锁获取一个condition,用于消费者线程在此等待和唤醒
            Condition consumer = lock.newCondition();
    
            void set(String name) {
                //获得锁
                lock.lock();
                try {
                    while (flag) {
                        try {
                            System.out.println("有产品了--" + Thread.currentThread().getName() + "生产等待");
                            //该生产者线程,在producer上等待
                            producer.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    ++count;
                    this.name = name;
                    System.out.println(Thread.currentThread().getName() + "生产了" + this.name + +count);
                    flag = !flag;
                    //唤醒在consumer上等待的消费者线程,这样不会唤醒等待的生产者
                    consumer.signalAll();
                } finally {
                    //释放锁
                    lock.unlock();
                }
            }
    
            void get() {
                lock.lock();
                try {
                    while (!flag) {
                        try {
                            System.out.println("没产品了--" + Thread.currentThread().getName() + "消费等待");
                            //该消费者线程,在consumer上等待
                            consumer.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println(Thread.currentThread().getName() + "消费了" + this.name + count);
                    flag = !flag;
                    //唤醒在producer监视器上等待的生产者线程,这样不会唤醒等待的消费者
                    producer.signalAll();
                } finally {
                    lock.unlock();
                }
            }
        }
    
        /**
         * 消费者
         */
        static class Consumer implements Runnable {
            private Resource resource;
    
            public Consumer(Resource resource) {
                this.resource = resource;
            }
    
            @Override
            public void run() {
                while (true) {
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //调用消费方法
                    resource.get();
                }
            }
        }
    
        /**
         * 生产者
         */
        static class Producer implements Runnable {
            private Resource resource;
    
            public Producer(Resource resource) {
                this.resource = resource;
            }
    
            @Override
            public void run() {
                while (true) {
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //调用生产方法
                    resource.set("面包");
                }
            }
        }
    }
    

    3.3 Lock实现生产消费和产品仓库的功能

    在上一个案例中,生产者线程生产的产品必须马上被消费,在下面的案例中,生产的产品可以被累积,使用Lock实现了多生产者多消费者的案例,同时实现了商品仓库的功能,使得生产者可以连续生产,消费者可以连续消费。

    public class LockPCWhithStorage {
        public static void main(String[] args) {
            Resource resource = new Resource();
            Consumer c = new Consumer(resource);
            Producer p = new Producer(resource);
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 4, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
            threadPoolExecutor.execute(p);
            threadPoolExecutor.execute(p);
            threadPoolExecutor.execute(c);
            threadPoolExecutor.execute(c);
            threadPoolExecutor.shutdown();
        }
    
        static class Resource {
            // 获得锁对象
            final Lock lock = new ReentrantLock();
            // 获得生产监视器
            final Condition notFull = lock.newCondition();
            // 获得消费监视器
            final Condition notEmpty = lock.newCondition();
    
            // 定义一个数组,当作仓库,用来存放商品
            final Object[] items = new Object[100];
            /*
             * 取消了falg标志,取而代之的是用仓库的数量来判断是否应该阻塞或者唤醒对应的线程
             * putpur:生产者使用的下标索引;
             * takeptr:消费者下标索引;
             * count:用计数器,记录商品个数
             */
            int putptr, takeptr, count;
    
            // 生产方法
            public void put(Object x) {
                // 获得锁
                lock.lock();
                try {
                    // 如果商品个数等于数组的长度,商品满了将生产将等待消费者消费
                    while (count == items.length) {
                        try {
                            System.out.println("仓库满了--" + Thread.currentThread().getName() + "生产等待");
                            notFull.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    // 生产索引对应的商品,放在仓库中
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    items[putptr] = x;
                    // 如果下标索引加一等于数组长度,将索引重置为0,重新开始
                    if (++putptr == items.length) {
                        putptr = 0;
                    }
                    // 商品数加1
                    ++count;
                    System.out.println(Thread.currentThread().getName() + "生产了" + x + "共有" + count + "个");
                    // 唤醒消费线程
                    notEmpty.signal();
                } finally {
                    // 释放锁
                    lock.unlock();
                }
            }
    
            // 消费方法
            public Object take() {
                //获得锁
                lock.lock();
                try {
                    //如果商品个数为0.消费等待
                    while (count == 0) {
                        try {
                            System.out.println("仓库空了--" + Thread.currentThread().getName() + "消费等待");
                            notEmpty.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    //获得对应索引的商品,表示消费了
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    Object x = items[takeptr];
                    //如果索引加一等于数组长度,表示取走了最后一个商品,消费完毕
                    if (++takeptr == items.length)
                    //消费索引归零,重新开始消费
                    {
                        takeptr = 0;
                    }
                    //商品数减一
                    --count;
                    System.out.println(Thread.currentThread().getName() + "消费了" + x + "还剩" + count + "个");
                    //唤醒生产线程
                    notFull.signal();
                    //返回消费的商品
                    return x;
                } finally {
                    //释放锁
                    lock.unlock();
                }
            }
        }
    
    
        static class Producer implements Runnable {
    
            private Resource resource;
    
            public Producer(Resource resource) {
                this.resource = resource;
            }
    
            @Override
            public void run() {
                while (true) {
                    resource.put("面包");
                }
            }
        }
    
        static class Consumer implements Runnable {
            private Resource resource;
    
            public Consumer(Resource resource) {
                this.resource = resource;
            }
    
    
            @Override
            public void run() {
                while (true) {
                    resource.take();
                }
            }
        }
    }
    

    3.4 输出ABCABC

    编写一个程序,开启3 个线程,这三个线程的name分别为A、B、C,每个线程将自己的名字 在屏幕上打印10 遍,要求输出的结果必须按名称顺序显示。

    public class PrintABC {
        ReentrantLock lock = new ReentrantLock();
        Condition A = lock.newCondition();
        Condition B = lock.newCondition();
        Condition C = lock.newCondition();
        private int flag = 1;
    
        public void printA(int i) {
            lock.lock();
            try {
                while (flag != 1) {
                    try {
                        A.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println(Thread.currentThread().getName() + " " + i);
                flag = 2;
                B.signal();
            } finally {
                lock.unlock();
            }
        }
    
        public void printB(int i) {
            lock.lock();
            try {
                while (flag != 2) {
                    try {
                        B.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println(Thread.currentThread().getName() + " " + i);
                flag = 3;
                C.signal();
            } finally {
                lock.unlock();
            }
        }
    
        public void printC(int i) {
            lock.lock();
            try {
                while (flag != 3) {
                    try {
                        C.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println(Thread.currentThread().getName() + " " + i);
                System.out.println("---------------------");
                flag = 1;
                A.signal();
            } finally {
                lock.unlock();
            }
        }
    
        public static void main(String[] args) {
            PrintABC testABC = new PrintABC();
            Thread A = new Thread(new A(testABC), "A");
            Thread B = new Thread(new B(testABC), "B");
            Thread C = new Thread(new C(testABC), "C");
            A.start();
            B.start();
            C.start();
        }
    
        static class A implements Runnable {
            private PrintABC testABC;
    
            public A(PrintABC testABC) {
                this.testABC = testABC;
            }
    
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    testABC.printA(i + 1);
                }
            }
        }
    
        static class B implements Runnable {
            private PrintABC testABC;
    
            public B(PrintABC testABC) {
                this.testABC = testABC;
            }
    
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    testABC.printB(i + 1);
                }
            }
        }
    
        static class C implements Runnable {
            private PrintABC testABC;
    
            public C(PrintABC testABC) {
                this.testABC = testABC;
            }
    
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    testABC.printC(i + 1);
                }
            }
        }
    }
    

    3.5 使用高级阻塞队列实现生产消费

    前面的案例都是使用的比较原始的方法,适合初学者,这里使用高级阻塞队列实现通知等待/生产消费。

    public class BlockingQueuePC {
        //定义一个阻塞队列
        static LinkedBlockingQueue<Object> objects = new LinkedBlockingQueue<>();
    
        public static void main(String[] args) {
            Resource resource = new Resource("面包");
            Consumer consumer = new Consumer();
            Producer producer = new Producer(resource);
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
            //启动多个生产者\消费者线程
            threadPoolExecutor.execute(producer);
            threadPoolExecutor.execute(consumer);
            threadPoolExecutor.execute(producer);
            threadPoolExecutor.execute(consumer);
            threadPoolExecutor.execute(consumer);
            threadPoolExecutor.shutdown();
        }
    
        /**
         * 消费者
         */
        static class Consumer implements Runnable {
    
            public Object take() throws InterruptedException {
                return objects.take();
            }
    
            @Override
            public void run() {
                while (true) {
                    try {
                        Object take = take();
                        System.out.println(Thread.currentThread().getName() + "消费了" + take + ",还剩" + objects.size());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        /**
         * 生产者
         */
        static class Producer implements Runnable {
            Resource resource;
    
            public Producer(Resource resource) {
                this.resource = resource;
            }
    
            public void put(Object o) throws InterruptedException {
                objects.put(o);
            }
    
            @Override
            public void run() {
                while (true) {
                    try {
                        put(resource);
                        System.out.println(Thread.currentThread().getName() + "生产了" + resource + ",还剩" + objects.size());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        /**
         * 产品/资源
         */
        static class Resource {
            String name;
            public Resource(String name) {
                this.name = name;
            }
    
            @Override
            public String toString() {
                return name;
            }
        }
    }
    

    我们的代码非常简单,并没有使用任何同步,那么如果做到线程安全和通信的呢,实际上这些活都被阻塞队列帮我们做了,对比上一个手动实现的生产消费+仓库案例,这个是不是简单得多,效率也更高呢?这些都是属于JUC包中的内容,想要学好Java并发,JUC是所有人绕不过去的坎!

    3.6 使用Semaphore信号量

    Semaphore是JDK1.5出现的类,翻译成字面意思为“信号量”,属于JUC,是synchronized的加强版,可以用来控制线程的并发数量。
    Semaphore可以控制同时访问共享资源的线程个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。它通过协调各个线程,以保证合理的使用公共资源。相比synchronized和lock锁一次只能允许一个线程访问资源,功能更加强大。
    案例:若一个工厂有5台机器,但是有8个工人,一台机器同时只能被一个工人使用,只有使用完了,其他工人才能继续使用。那么我们就可以通过Semaphore来实现:

    public class Worker extends Thread {
        private int num;
        private Semaphore semaphore;
    
        public Worker(int num, Semaphore semaphore) {
            this.num = num;
            this.semaphore = semaphore;
        }
    
        public static void main(String[] args) {
            //工人数
            int N = 8; 
            //机器数目 许可=5
            Semaphore semaphore = new Semaphore(5);
            for (int i = 0; i < N; i++)
                new Worker(i, semaphore).start();
        }
    
        @Override
        public void run() {
            try {
                //获取permits个许可,若无许可能够获得,则会一直等待,直到获得许可。
                semaphore.acquire();
                System.out.println("工人" + this.num + "占用一个机器在生产...");
                Thread.sleep(2000);
                System.out.println("工人" + this.num + "释放出机器");
                //释放许可。注意,在释放许可之前,必须先获获得许可。
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    

    3.7 使用管道流

    对于Piped类型的流,必须先要进行绑定,即调用connect方法,如果没有绑定,那么将会抛出异常。管倒流通信类似于聊天室。用的比较少

    public class PipeTest {
        public static void main(String[] args) throws IOException {
            PipedWriter pipedWriter = new PipedWriter();
            PipedReader pipedReader = new PipedReader();
            pipedWriter.connect(pipedReader);
            Thread printThread = new Thread(new Print(pipedReader), "PrintThread");
            printThread.start();
            int receive = 0;
            try {
                while ((receive = System.in.read()) != -1) {
                    pipedWriter.write(receive);
                }
            } finally {
                pipedWriter.close();
            }
        }
    
    
        static class Print implements Runnable {
    
            private PipedReader in;
    
            public Print(PipedReader in) {
                this.in = in;
            }
    
            @Override
            public void run() {
                int receive = 0;
                try {
                    while ((receive = in.read()) != -1) {
                        System.out.print((char) receive);
                    }
                } catch (IOException e) {
    
                }
            }
        }
    }
    
    

    参考资料:

    1. 《实战Java高并发程序设计(第2版) 》
    2. 《Java并发编程的艺术》
    3. 《Java并发编程之美》

    如果有什么不懂或者需要交流,可以留言。另外希望点赞、收藏、关注,我将不间断更新各种Java学习博客!

    展开全文
  • java实现线程通信的几种方式

    万次阅读 2020-05-30 19:34:35
    比如大家熟知的消息中间件的实现,从某种角度上讲,就借助了多线程通信的思想,下面总结了JDK中常用的几种实现线程通信的方式,提供参考 1、synchronized实现方式 可能很多小伙伴们会有疑问,synchronized是对共享...

    前言

    在多线程的世界里,线程与线程之间的交互无处不在,只不过在平时的开发过程中,大多数情况下,我们都在单线程的模式下进行编码,即使有,也直接借助框架自身的机制实现了,其实线程之间的通信在JDK中是一个比较深的问题,比如大家熟知的消息中间件的实现,从某种角度上讲,就借助了多线程通信的思想,下面总结了JDK中常用的几种实现线程通信的方式,提供参考

    1、synchronized实现方式

    可能很多小伙伴们会有疑问,synchronized是对共享资源加锁使用的,怎么和线程通信扯在一起呢?这里纠正一个小小的偏见,也是我近期才矫正过来的

    我们要弄明白的一点是,为什么会存在线程通讯这个问题呢?根据一些技术大牛们的说法就是,多个线程之间需要相互传递一些参数、变量或者是各个线程的执行需要互相依赖各自的结果,比如我们熟知的生产者消费者模式,只有生产者生产出来了东西,消费者才能进行消费啊

    这里模拟假如有2个线程,需要操作一个共享资源,即修改共享资源的数据,使用synchronized的方式如下:

    public class SycDemo1 {
    
        private static Object lock = new Object();
    
        private static String weather = "sunny";
    
        public static void main(String[] args) {
    
            new Thread(()->{
                synchronized (lock){
                    System.out.println("
    展开全文
  • JAVA线程通信详解

    万次阅读 多人点赞 2018-10-14 09:11:36
     线程线程之间不是相互独立的个体,它们彼此之间需要相互通信和协作,最典型的例子就是生产者-消费者问题:当队列满时,生产者需要等待队列有空间才能继续往里面放入商品,而在等待的期间内,生产者必须释放对...

    目录

    一、概述

    二、wait/notify 机制

    三、Condition

    四、生产者/消费者模式

    五、线程间的通信——管道

    六、方法Join的使用


    一、概述

           线程与线程之间不是相互独立的个体,它们彼此之间需要相互通信和协作,最典型的例子就是生产者-消费者问题:当队列满时,生产者需要等待队列有空间才能继续往里面放入商品,而在等待的期间内,生产者必须释放对临界资源(即队列)的占用权。因为生产者如果不释放对临界资源的占用权,那么消费者就无法消费队列中的商品,就不会让队列有空间,那么生产者就会一直无限等待下去。因此一般情况下,当队列满时,会让生产者交出对临界资源的占用权,并进入挂起状态。然后等待消费者消费了商品,然后消费者通知生产者队列有空间了。同样地,当队列空时,消费者也必须等待,等待生产者通知它队列中有商品了。这种互相通信的过程就是线程间的协作。本文首先介绍 wait/notify 机制,并对实现该机制的两种方式——synchronized+wait-notify模式和Lock+Condition模式进行详细剖析,以作为线程间通信与协作的基础。进一步地,以经典的生产者-消费者问题为背景,熟练对 wait/notify 机制的使用。最后对 Thread 类中的 join() 方法进行源码分析,并以宿主线程与寄生线程的协作为例进行说明。在下面的例子中,虽然两个线程实现了通信,但是凭借线程B不断地通过while语句轮询来检测某一个条件,这样会导致CPU的浪费。因此,需要一种机制来减少CPU资源的浪费,而且还能实现多个线程之间的通信,即 wait/notify 机制。

    //资源类
    class MyList {
    
        //临界资源
        private volatile List<String> list = new ArrayList<String>();
    
        public void add() {
            list.add("abc");
        }
    
        public int size() {
            return list.size();
        }
    }
    
    // 线程A
    class ThreadA extends Thread {
    
        private MyList list;
    
        public ThreadA(MyList list,String name) {
            super(name);
            this.list = list;
        }
    
        @Override
        public void run() {
            try {
                for (int i = 0; i < 3; i++) {
                    list.add();
                    System.out.println("添加了" + (i + 1) + "个元素");
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    //线程B
    class ThreadB extends Thread {
    
        private MyList list;
    
        public ThreadB(MyList list,String name) {
            super(name);
            this.list = list;
        }
    
        @Override
        public void run() {
            try {
                while (true) {          // while 语句轮询
                    if (list.size() == 2) {
                        System.out.println("==2了,线程b要退出了!");
                        throw new InterruptedException();
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    //测试
    public class Test {
        public static void main(String[] args) {
    
            MyList service = new MyList();
    
            ThreadA a = new ThreadA(service,"A");
            ThreadB b = new ThreadB(service,"B");
    
            a.start();
            b.start();
        }
    }
    /* Output(输出结果不唯一): 
       添加了1个元素
       添加了2个元素
       ==2了,线程b要退出了!
       java.lang.InterruptedException at test.ThreadB.run(Test.java:57)
       添加了3个元素
     */

    二、wait/notify 机制

           在这之前,线程间通过共享数据来实现通信,即多个线程主动地读取一个共享数据,通过 同步互斥访问机制保证线程的安全性。等待/通知机制主要由Object类中的wait()、notify() 和 notifyAll()三个方法来实现,这三个方法均非Thread类中所声明的方法,而是Object类中声明的方法。原因是每个对象都拥有monitor(锁),所以让当前线程等待某个对象的锁,当然应该通过这个对象来操作,而不是用当前线程来操作,因为当前线程可能会等待多个线程的锁,如果通过线程来操作,就非常复杂了。

    1.wait()——让当前线程 (Thread.concurrentThread() 方法所返回的线程) 释放对象锁并进入等待(阻塞)状态。

    • 方法声明:public final native void wait(long timeout) throws InterruptedException;
    • 方法作用:Causes the current thread to wait until either another thread invokes the notify() method or the notifyAll() method for this object, or a specified amount of time has elapsed. If any threads are waiting on this object, one of them is chosen to be awakened. The choice is arbitrary and occurs at the discretion of the implementation. A thread waits on this object’s monitor by calling one of the wait() methods.This method causes the current thread (call it T):① to place itself in the wait set for this object;② to relinquish (放弃) any and all synchronization claims on this object;③ Thread T becomes disabled for thread scheduling purposes and lies dormant (休眠) until one of four things happens:Some other thread invokes the notify method for this object and thread T happens to be arbitrarily chosen as the thread to be awakened;Some other thread invokes the notifyAll method for this object;Some other thread interrupts thread T;The specified amount of real time has elapsed, more or less. If timeout is zero, however, then real time is not taken into consideration and the thread simply waits until notified (等待时间为 0 意味着永远等待,直至线程被唤醒) .The thread T is then removed from the wait set for this object and re-enabled for thread scheduling. It then competes in the usual manner with other threads for the right to synchronize on the object; once it has gained control of the object, all its synchronization claims on the object are restored to the status quo ante - that is, to the situation as of the time that the wait method was invoked. Thread T then returns from the invocation of the wait method. Thus, on return from the wait method, the synchronization state of the object and of thread T is exactly as it was when the wait method was invoked.
    • 方法使用条件:This method should only be called by a thread that is the owner of this object’s monitor.
    • 异常:运行时(不受检查)异常 IllegalMonitorStateException: if the current thread is not the owner of this object’s monitor; IllegalArgumentException: if the value of timeout is negative;受检查异常 (中断阻塞线程,抛 InterruptedException并终止线程,释放锁,释放CPU) InterruptedException: if any thread interrupted the current thread before or while the current thread was waiting for a notification. The interrupted status of the current thread is cleared when this exception is thrown.

    2.notify()——唤醒一个正在等待相应对象锁的线程,使其进入就绪队列,以便在当前线程释放锁后竞争锁,进而得到CPU的执行。

    • 方法声明:public final native void notify();
    • 方法作用:Wakes up a single thread that is waiting on this object’s monitor. If any threads are waiting on this object, one of them is chosen to be awakened. The choice is arbitrary and occurs at the discretion of the implementation.The awakened thread will not be able to proceed until the current thread relinquishes (放弃) the lock on this object.(在执行 notify() 方法后,当前线程不会马上释放该锁对象,呈 wait 状态的线程也并不能马上获取该对象锁。只有等到执行notify()方法的线程退出synchronized代码块/方法后,当前线程才会释放锁,而相应的呈wait状态的线程才可以去争取该对象锁。) The awakened thread will compete in the usual manner with any other threads that might be actively competing to (竞争) synchronize on this object; the awakened thread enjoys no reliable privilege or disadvantage in being the next thread to lock this object.
    • 方法使用条件This method should only be called by a thread that is the owner of this object’s monitor. A thread becomes the owner of the object’s monitor in one of three ways:By executing a synchronized instance method of that object;By executing the body of a synchronized statement that synchronizes on the object;For objects of type Class, by executing a synchronized static method of that class.Only one thread at a time can own an object’s monitor(互斥锁).
    • 异常:运行时(不受检查)异常 IllegalMonitorStateException: if the current thread is not the owner of this object’s monitor.

    3.notifyAll()——唤醒所有正在等待相应对象锁的线程,使它们进入就绪队列,以便在当前线程释放锁后竞争锁,进而得到CPU的执行。

    • 方法声明:public final native void notifyAll();
    • 方法作用:Wakes up all threads that are waiting on this object’s monitor. A thread waits on an object’s monitor by calling one of the wait methods.The awakened threads will not be able to proceed until the current thread relinquishes (放弃) the lock on this object. The awakened threads will compete in the usual manner with any other threads that might be actively competing to (竞争) synchronize on this object; the awakened threads enjoy no reliable privilege or disadvantage in being the next thread to lock this object.
    • 方法使用条件:This method should only be called by a thread that is the owner of this object’s monitor.
    • 异常:运行时(不受检查)异常 IllegalMonitorStateException: if the current thread is not the owner of this object’s monitor.

    4.小结

            从以上描述可以得出:wait()、notify() 和 notifyAll()方法是 本地方法,并且为 final 方法,无法被重写;调用某个对象的 wait() 方法能让 当前线程阻塞,并且当前线程必须拥有此对象的monitor(即锁);调用某个对象的 notify() 方法能够唤醒 一个正在等待这个对象的monitor的线程,如果有多个线程都在等待这个对象的monitor,则只能唤醒其中一个线程;调用notifyAll()方法能够唤醒所有正在等待这个对象的monitor的线程。

    • 方法调用与线程状态关系

      每个锁对象都有两个队列,一个是就绪队列,一个是阻塞队列。就绪队列存储了已就绪(将要竞争锁)的线程,阻塞队列存储了被阻塞的线程。当一个阻塞线程被唤醒后,才会进入就绪队列,进而等待CPU的调度;反之,当一个线程被wait后,就会进入阻塞队列,等待被唤醒。

    Threadæ¹æ³ä¸ç¶æ.jpg-73.5kB       

    • 使用举例
    public class Test {
        public static Object object = new Object();
    
        public static void main(String[] args) throws InterruptedException {
            Thread1 thread1 = new Thread1();
            Thread2 thread2 = new Thread2();
    
            thread1.start();
    
            Thread.sleep(2000);
    
            thread2.start();
        }
    
        static class Thread1 extends Thread {
            @Override
            public void run() {
                synchronized (object) {
                    System.out.println("线程" + Thread.currentThread().getName()
                            + "获取到了锁...");
                    try {
                        System.out.println("线程" + Thread.currentThread().getName()
                                + "阻塞并释放锁...");
                        object.wait();
                    } catch (InterruptedException e) {
                    }
                    System.out.println("线程" + Thread.currentThread().getName()
                            + "执行完成...");
                }
            }
        }
    
        static class Thread2 extends Thread {
            @Override
            public void run() {
                synchronized (object) {
                    System.out.println("线程" + Thread.currentThread().getName()
                            + "获取到了锁...");
                    object.notify();
                    System.out.println("线程" + Thread.currentThread().getName()
                            + "唤醒了正在wait的线程...");
                }
                System.out
                        .println("线程" + Thread.currentThread().getName() + "执行完成...");
            }
        }
    }
    /* Output: 
            线程Thread-0获取到了锁...
            线程Thread-0阻塞并释放锁...
            线程Thread-1获取到了锁...
            线程Thread-1唤醒了正在wait的线程...
            线程Thread-1执行完成...
            线程Thread-0执行完成...
     */
    • 多个同类型线程的场景(wait 的条件发生变化)
    //资源类
    class ValueObject {
        public static List<String> list = new ArrayList<String>();
    }
    
    //元素添加线程
    class ThreadAdd extends Thread {
    
        private String lock;
    
        public ThreadAdd(String lock,String name) {
            super(name);
            this.lock = lock;
        }
    
        @Override
        public void run() {
            synchronized (lock) {
                ValueObject.list.add("anyString");
                lock.notifyAll();               // 唤醒所有 wait 线程
            }
        }
    }
    
    //元素删除线程
    class ThreadSubtract extends Thread {
    
        private String lock;
    
        public ThreadSubtract(String lock,String name) {
            super(name);
            this.lock = lock;
        }
    
        @Override
        public void run() {
            try {
                synchronized (lock) {
                    if (ValueObject.list.size() == 0) {
                        System.out.println("wait begin ThreadName=" + Thread.currentThread().getName());
                        lock.wait();
                        System.out.println("wait   end ThreadName=" + Thread.currentThread().getName());
                    }
                    ValueObject.list.remove(0);
                    System.out.println("list size=" + ValueObject.list.size());
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    //测试类
    public class Run {
        public static void main(String[] args) throws InterruptedException {
    
            //锁对象
            String lock = new String("");
    
            ThreadSubtract subtract1Thread = new ThreadSubtract(lock,"subtract1Thread");
            subtract1Thread.start();
    
            ThreadSubtract subtract2Thread = new ThreadSubtract(lock,"subtract2Thread");
            subtract2Thread.start();
    
            Thread.sleep(1000);
    
            ThreadAdd addThread = new ThreadAdd(lock,"addThread");
            addThread.start();
    
        }
    }
    /* Output: 
            wait begin ThreadName=subtract1Thread
            wait begin ThreadName=subtract2Thread
            wait   end ThreadName=subtract2Thread
            list size=0
            wait   end ThreadName=subtract1Thread
            Exception in thread "subtract1Thread"
                java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
                at java.util.ArrayList.rangeCheck(Unknown Source)
                at java.util.ArrayList.remove(Unknown Source)
                at test.ThreadSubtract.run(Run.java:49)
     */

           当线程subtract1Thread被唤醒后,将从wait处继续执行。但由于线程subtract2Thread先获取到锁得到运行,导致线程subtr-act1Thread的wait的条件发生变化(不再满足),而 线程subtract1Thread 却毫无所知,导致异常产生。像这种有多个相同类型的线程场景,为防止wait的条件发生变化而导致的线程异常终止,我们在阻塞线程被唤醒的同时还必须对wait的条件进行额外的检查,如下所示:只需将 线程类ThreadSubtract 的 run()方法中的 if 条件 改为 while 条件 即可。

    //元素删除线程
    class ThreadSubtract extends Thread {
    
        private String lock;
    
        public ThreadSubtract(String lock,String name) {
            super(name);
            this.lock = lock;
        }
    
        @Override
        public void run() {
            try {
                synchronized (lock) {
                    while (ValueObject.list.size() == 0) {    //将 if 改成 while 
                        System.out.println("wait begin ThreadName=" + Thread.currentThread().getName());
                        lock.wait();
                        System.out.println("wait   end ThreadName=" + Thread.currentThread().getName());
                    }
                    ValueObject.list.remove(0);
                    System.out.println("list size=" + ValueObject.list.size());
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    /* Output: 
            wait begin ThreadName=subtract1Thread
            wait begin ThreadName=subtract2Thread
            wait   end ThreadName=subtract2Thread
            list size=0
            wait   end ThreadName=subtract1Thread
            wait begin ThreadName=subtract1Thread
     */

    三、Condition

           Condition是在java 1.5中出现的,它用来替代传统的Object的wait()/notify()实现线程间的协作,它的使用依赖于 Lock,Condition、Lock 和 Thread 三者之间的关系如下图所示。相比使用Object的wait()/notify(),使用Condition的await()/signal()这种方式能够更加安全和高效地实现线程间协作。Condition是个接口,基本的方法就是await()和signal()方法。Condition依赖于Lock接口,生成一个Condition的基本代码是lock.newCondition() 。 必须要注意的是,Condition 的 await()/signal() 使用都必须在lock保护之内,也就是说,必须在lock.lock()和lock.unlock之间才可以使用。事实上,Conditon的await()/signal() 与 Object的wait()-notify() 有着天然的对应关系:Conditon中的await()对应Object的wait();Condition中的signal()对应Object的notify();Condition中的signalAll()对应Object的notifyAll()。

    è¿éåå¾çæè¿°

           使用Condition往往比使用传统的通知等待机制(Object的wait()/notify())要更灵活、高效,例如,我们可以使用多个Condition实现通知部分线程:

    // 线程 A
    class ThreadA extends Thread {
        private MyService service;
        public ThreadA(MyService service) {
            super();
            this.service = service;
        }
        @Override
        public void run() {
            service.awaitA();
        }
    }
    // 线程 B
    class ThreadB extends Thread {
        public ThreadB(MyService service) {
            super();
            this.service = service;
        }
        @Override
        public void run() {
            service.awaitB();
        }
    }
    
    class MyService {
        private Lock lock = new ReentrantLock();
        // 使用多个Condition实现通知部分线程
        public Condition conditionA = lock.newCondition();
        public Condition conditionB = lock.newCondition();
    
        public void awaitA() {
            lock.lock();
            try {
                System.out.println("begin awaitA时间为" + System.currentTimeMillis()
                        + " ThreadName=" + Thread.currentThread().getName());
                conditionA.await();
                System.out.println("  end awaitA时间为" + System.currentTimeMillis()
                        + " ThreadName=" + Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
        public void awaitB() {
            lock.lock();
            try {
                System.out.println("begin awaitB时间为" + System.currentTimeMillis()
                        + " ThreadName=" + Thread.currentThread().getName());
                conditionB.await();
                System.out.println("  end awaitB时间为" + System.currentTimeMillis()
                        + " ThreadName=" + Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
        public void signalAll_A() {
            try {
                lock.lock();
                System.out.println("  signalAll_A时间为" + System.currentTimeMillis()
                        + " ThreadName=" + Thread.currentThread().getName());
                conditionA.signalAll();
            } finally {
                lock.unlock();
            }
        }
    
        public void signalAll_B() {
            try {
                lock.lock();
                System.out.println("  signalAll_B时间为" + System.currentTimeMillis()
                        + " ThreadName=" + Thread.currentThread().getName());
                conditionB.signalAll();
            } finally {
                lock.unlock();
            }
        }
    }
    
    // 测试
    public class Run {
        public static void main(String[] args) throws InterruptedException {
            MyService service = new MyService();
    
            ThreadA a = new ThreadA(service);
            a.setName("A");
            a.start();
    
            ThreadB b = new ThreadB(service);
            b.setName("B");
            b.start();
    
            Thread.sleep(3000);
            service.signalAll_A();
        }
    }

           输出结果如下图所示,我们可以看到只有线程A被唤醒,线程B仍然阻塞。实际上,Condition 实现了一种分组机制,将所有对临界资源进行访问的线程进行分组,以便实现线程间更精细化的协作,例如通知部分线程。我们可以从上面例子的输出结果看出,只有conditionA范围内的线程A被唤醒,而conditionB范围内的线程B仍然阻塞。

    å¤ä¸ªConditionéç¥é¨å线ç¨.png-13.1kB

    四、生产者/消费者模式

             等待/通知机制 最经典的应用就是 生产者-消费者模型。下面以多生产者-多消费者问题为背景,分别运用两种模式 —— synchronized+wait-notify模式和Lock+Condition模式实现 wait-notify 机制。示例一传统实现:

    //资源类
    class MyStack {
        // 共享队列
        private List list = new ArrayList();
    
        // 生产
        @SuppressWarnings("unchecked")
        public synchronized void push() {
            try {
                while (list.size() == 1) {    // 多个生产者
                    System.out.println("队列已满,线程 "
                            + Thread.currentThread().getName() + " 呈wait状态...");
                    this.wait();
                }
                list.add("anyString=" + Math.random());
                System.out.println("线程 " + Thread.currentThread().getName()
                        + " 生产了,队列已满...");
                this.notifyAll();                   // 防止生产者仅通知生产者
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        // 消费
        public synchronized String pop() {
            String returnValue = "";
            try {
                while (list.size() == 0) {              // 多个消费者
                    System.out.println("队列已空,线程 "
                            + Thread.currentThread().getName() + " 呈wait状态...");
                    this.wait();
                }
                returnValue = "" + list.get(0);
                list.remove(0);
                System.out.println("线程 " + Thread.currentThread().getName()
                        + " 消费了,队列已空...");
                this.notifyAll();                   // 防止消费者仅通知消费者
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return returnValue;
        }
    }
    
    //生产者
    class P_Thread extends Thread {
    
        private MyStack myStack;
    
        public P_Thread(MyStack myStack,String name) {
            super(name);
            this.myStack = myStack;
        }
    
        public void pushService() {
            myStack.push();
        }
    
        @Override
        public void run() {
            while (true) {     
                myStack.push();
            }
        }
    }
    
    //消费者
    class C_Thread extends Thread {
    
        private MyStack myStack;
    
        public C_Thread(MyStack myStack,String name) {
            super(name);
            this.myStack = myStack;
        }
    
        @Override
        public void run() {
            while (true) {
                myStack.pop();
            }
        }
    }
    
    //测试类
    public class Run {
        public static void main(String[] args) throws InterruptedException {
            MyStack myStack = new MyStack();
    
            P_Thread pThread1 = new P_Thread(myStack, "P1");
            P_Thread pThread2 = new P_Thread(myStack, "P2");
            P_Thread pThread3 = new P_Thread(myStack, "P3");
            P_Thread pThread4 = new P_Thread(myStack, "P4");
            P_Thread pThread5 = new P_Thread(myStack, "P5");
            P_Thread pThread6 = new P_Thread(myStack, "P6");
            pThread1.start();
            pThread2.start();
            pThread3.start();
            pThread4.start();
            pThread5.start();
            pThread6.start();
    
            C_Thread cThread1 = new C_Thread(myStack, "C1");
            C_Thread cThread2 = new C_Thread(myStack, "C2");
            C_Thread cThread3 = new C_Thread(myStack, "C3");
            C_Thread cThread4 = new C_Thread(myStack, "C4");
            C_Thread cThread5 = new C_Thread(myStack, "C5");
            C_Thread cThread6 = new C_Thread(myStack, "C6");
            C_Thread cThread7 = new C_Thread(myStack, "C7");
            C_Thread cThread8 = new C_Thread(myStack, "C8");
            cThread1.start();
            cThread2.start();
            cThread3.start();
            cThread4.start();
            cThread5.start();
            cThread6.start();
            cThread7.start();
            cThread8.start();
        }
    }
    /* Output: 
            线程 P1 生产了,队列已满...
            队列已满,线程 P1 呈wait状态...
            线程 C5 消费了,队列已空...
            队列已空,线程 C5 呈wait状态...
            队列已空,线程 C8 呈wait状态...
            队列已空,线程 C2 呈wait状态...
            队列已空,线程 C7 呈wait状态...
            队列已空,线程 C4 呈wait状态...
            队列已空,线程 C6 呈wait状态...
            队列已空,线程 C3 呈wait状态...
            队列已空,线程 C1 呈wait状态...
            线程 P6 生产了,队列已满...
            队列已满,线程 P6 呈wait状态...
            队列已满,线程 P5 呈wait状态...
            队列已满,线程 P4 呈wait状态...
            ...
     */
    

           对于生产者-消费者问题,有两个要点需要注意:第一,在多个同类型线程(多个生产者线程或者消费者线程)的场景中,为防止wait的条件发生变化而导致线程异常终止,我们在阻塞线程被唤醒的同时还必须对wait的条件进行额外的检查,即 使用 while 循环代替 if 条件;第二,在多个同类型线程(多个生产者线程或者消费者线程)的场景中,为防止生产者(消费者)唤醒生产者(消费者),保证生产者和消费者互相唤醒,需要 使用 notify 替代 notifyAll。示例二,使用condition方式实现:

    // 线程A
    class MyThreadA extends Thread {
    
        private MyService myService;
    
        public MyThreadA(MyService myService, String name) {
            super(name);
            this.myService = myService;
        }
    
        @Override
        public void run() {
            while (true)
                myService.set();
        }
    }
    
    // 线程B
    class MyThreadB extends Thread {
    
        private MyService myService;
    
        public MyThreadB(MyService myService, String name) {
            super(name);
            this.myService = myService;
        }
    
        @Override
        public void run() {
            while (true)
                myService.get();
        }
    }
    
    // 资源类
    class MyService {
    
        private ReentrantLock lock = new ReentrantLock();
        private Condition conditionA = lock.newCondition();   // 生产线程
        private Condition conditionB = lock.newCondition();  // 消费线程
        private boolean hasValue = false;
    
        public void set() {
            try {
                lock.lock();
                while (hasValue == true) {
                    System.out.println("[生产线程] " + " 线程"
                            + Thread.currentThread().getName() + " await...");
                    conditionA.await();
                }
                System.out.println("[生产中] " + " 线程" + Thread.currentThread().getName() + " 生产★");
                Thread.sleep(1000);
                hasValue = true;
                System.out.println("线程" + Thread.currentThread().getName() + " 生产完毕...");
                System.out.println("[唤醒所有消费线程] " + " 线程"
                        + Thread.currentThread().getName() + "...");
                conditionB.signalAll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
        public void get() {
            try {
                lock.lock();
                while (hasValue == false) {
                    System.out.println("[消费线程] " + " 线程"
                            + Thread.currentThread().getName() + " await...");
                    conditionB.await();
                }
                System.out.println("[消费中] " + " 线程"
                        + Thread.currentThread().getName() + " 消费☆");
                Thread.sleep(1000);
                System.out.println("线程" + Thread.currentThread().getName() + " 消费完毕...");
                hasValue = false;
                System.out.println("[唤醒所有生产线程] " + " 线程"
                        + Thread.currentThread().getName() + "...");
                conditionA.signalAll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }
    
    public class Run {
        public static void main(String[] args) throws InterruptedException {
            MyService service = new MyService();
    
            MyThreadA[] threadA = new MyThreadA[10];
            MyThreadB[] threadB = new MyThreadB[10];
    
            for (int i = 0; i < 10; i++) {
                threadA[i] = new MyThreadA(service, "ThreadA-" + i);
                threadB[i] = new MyThreadB(service, "ThreadB-" + i);
                threadA[i].start();
                threadB[i].start();
            }
        }
    }
    /* Output: 
            [生产中]  线程ThreadA-0 生产★
            线程ThreadA-0 生产完毕...
            [唤醒所有消费线程]  线程ThreadA-0...
            [生产线程]  线程ThreadA-0 await...
            [消费中]  线程ThreadB-0 消费☆
            线程ThreadB-0 消费完毕...
            [唤醒所有生产线程]  线程ThreadB-0...
            [消费线程]  线程ThreadB-0 await...
            [生产中]  线程ThreadA-1 生产★
            线程ThreadA-1 生产完毕...
            [唤醒所有消费线程]  线程ThreadA-1...
            [生产线程]  线程ThreadA-1 await...
            [消费中]  线程ThreadB-1 消费☆
            线程ThreadB-1 消费完毕...
            [唤醒所有生产线程]  线程ThreadB-1...
            [消费线程]  线程ThreadB-1 await...
            [生产中]  线程ThreadA-2 生产★
            线程ThreadA-2 生产完毕...
            [唤醒所有消费线程]  线程ThreadA-2...
            ...
     */

    五、线程间的通信——管道

            PipedInputStream类 与 PipedOutputStream类 用于在应用程序中创建管道通信。一个PipedInputStream实例对象必须和一个PipedOutputStream实例对象进行连接而产生一个通信管道。PipedOutputStream可以向管道中写入数据,PipedIntputStream可以读取PipedOutputStream向管道中写入的数据,这两个类主要用来完成线程之间的通信。一个线程的PipedInputStream对象能够从另外一个线程的PipedOutputStream对象中读取数据,如下图所示:
    线ç¨é信示æå¾ä¹ç®¡é.jpg-30.5kB

           PipedInputStream和PipedOutputStream的实现原理类似于”生产者-消费者”原理,PipedOutputStream是生产者,PipedInputStream是消费者。在PipedInputStream中,有一个buffer字节数组,默认大小为1024,作为缓冲区,存放”生产者”生产出来的东西。此外,还有两个变量in和out —— in用来记录”生产者”生产了多少,out是用来记录”消费者”消费了多少,in为-1表示消费完了,in==out表示生产满了。当消费者没东西可消费的时候,也就是当in为-1的时候消费者会一直等待,直到有东西可消费。在Java的JDK 中提供了四个类用于线程间通信字节流——PipedInputStream和PipedOutputStream;字符流——PipedReader和PipedWriter;
     

    //读线程
    class ThreadRead extends Thread {
    
        private ReadData read;
        private PipedInputStream input;
    
        public ThreadRead(ReadData read, PipedInputStream input) {
            super();
            this.read = read;
            this.input = input;
        }
    
        public void readMethod(PipedInputStream input) {
            try {
                System.out.println("read  :");
                byte[] byteArray = new byte[20];
                int readLength = input.read(byteArray);
                while (readLength != -1) {
                    String newData = new String(byteArray, 0, readLength);
                    System.out.print(newData);
                    readLength = input.read(byteArray);
                }
                System.out.println();
                input.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void run() {
            this.readMethod(input);
        }
    }
    
    //写线程
    class ThreadWrite extends Thread {
    
        private WriteData write;
        private PipedOutputStream out;
    
        public ThreadWrite(WriteData write, PipedOutputStream out) {
            super();
            this.write = write;
            this.out = out;
        }
    
        public void writeMethod(PipedOutputStream out) {
            try {
                System.out.println("write :");
                for (int i = 0; i < 30; i++) {
                    String outData = "" + (i + 1);
                    out.write(outData.getBytes());
                    System.out.print(outData);
                }
                System.out.println();
                out.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void run() {
            this.writeMethod(out);
        }
    }
    
    //测试
    public class Run {
    
        public static void main(String[] args) {
            try {
                WriteData writeData = new WriteData();
                ReadData readData = new ReadData();
    
                PipedInputStream inputStream = new PipedInputStream();
                PipedOutputStream outputStream = new PipedOutputStream();
    
                // inputStream.connect(outputStream);   // 效果相同
                outputStream.connect(inputStream);
    
                ThreadRead threadRead = new ThreadRead(readData, inputStream);
                threadRead.start();
    
                Thread.sleep(2000);
    
                ThreadWrite threadWrite = new ThreadWrite(writeData, outputStream);
                threadWrite.start();
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    /* Output: 
            read  :
            write :
            123456789101112131415161718192021222324252627282930
            123456789101112131415161718192021222324252627282930
     */

    六、方法Join的使用

           1. join() 的定义

           假如在main线程中调用thread.join方法,则main线程会等待thread线程执行完毕或者等待一定的时间。详细地,如果调用的是无参join方法,则等待thread执行完毕;如果调用的是指定了时间参数的join方法,则等待一定的时间。join()方法有三个重载版本:

    public final synchronized void join(long millis) throws InterruptedException {...}
    public final synchronized void join(long millis, int nanos) throws InterruptedException {...}
    public final void join() throws InterruptedException {...}
    

             以 join(long millis) 方法为例,其内部调用了Object的wait()方法,如下图:

    join çå®ä¹.png-39.6kB

          根据以上源代码可以看出,join()方法是通过wait()方法 (Object 提供的方法) 实现的。当 millis == 0 时,会进入 while(isAlive()) 循环,并且只要子线程是活的,宿主线程就不停的等待。 wait(0) 的作用是让当前线程(宿主线程)等待,而这里的当前线程是指 Thread.currentThread() 所返回的线程。所以,虽然是子线程对象(锁)调用wait()方法,但是阻塞的是宿主线程。
           2.join() 使用实例及原理

    //示例代码
    public class Test {
    
        public static void main(String[] args) throws IOException  {
            System.out.println("进入线程"+Thread.currentThread().getName());
            Test test = new Test();
            MyThread thread1 = test.new MyThread();
            thread1.start();
            try {
                System.out.println("线程"+Thread.currentThread().getName()+"等待");
                thread1.join();
                System.out.println("线程"+Thread.currentThread().getName()+"继续执行");
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        } 
    
        class MyThread extends Thread{
            @Override
            public void run() {
                System.out.println("进入线程"+Thread.currentThread().getName());
                try {
                    Thread.currentThread().sleep(5000);
                } catch (InterruptedException e) {
                    // TODO: handle exception
                }
                System.out.println("线程"+Thread.currentThread().getName()+"执行完毕");
            }
        }
    }
    /* Output:
            进入线程main
            线程main等待
            进入线程Thread-0
            线程Thread-0执行完毕
            线程main继续执行
     */

           看上面的例子,当 main线程 运行到 thread1.join() 时,main线程会获得线程对象thread1的锁(wait 意味着拿到该对象的锁)。只要 thread1线程 存活, 就会调用该对象锁的wait()方法阻塞 main线程。那么,main线程被什么时候唤醒呢?事实上,有wait就必然有notify。在整个jdk里面,我们都不会找到对thread1线程的notify操作。这就要看jvm代码了:

    //一个c++函数:
    void JavaThread::exit(bool destroy_vm, ExitType exit_type) ;
    
    //这个函数的作用就是在一个线程执行完毕之后,jvm会做的收尾工作。里面有一行代码:ensure_join(this);
    
    该函数源码如下:
    
    static void ensure_join(JavaThread* thread) {
        Handle threadObj(thread, thread->threadObj());
    
        ObjectLocker lock(threadObj, thread);
    
        thread->clear_pending_exception();
    
        java_lang_Thread::set_thread_status(threadObj(), java_lang_Thread::TERMINATED);
    
        java_lang_Thread::set_thread(threadObj(), NULL);
    
        //thread就是当前线程,就是刚才说的thread1线程。
        lock.notify_all(thread);
    
        thread->clear_pending_exception();
    }

           至此,thread1线程对象锁调用了notifyall,那么main线程也就能继续跑下去了。由于 join方法 会调用 wait方法 让宿主线程进入阻塞状态,并且会释放线程占有的锁,并交出CPU执行权限。结合 join 方法的声明,可以总结出以下三条:join方法同样会会让线程交出CPU执行权限;join方法同样会让线程释放对一个对象持有的锁;如果调用了join方法,必须捕获InterruptedException异常或者将该异常向上层抛出。


    原文链接地址:https://blog.csdn.net/justloveyou_/article/details/54929949

    展开全文
  • 主要介绍了Java使用阻塞队列控制线程通信的方法,结合实例形式详细分析了java使用阻塞队列控制线程通信的相关原理、方法及操作注意事项,需要的朋友可以参考下
  • 本文学习并总结java多线程与线程间通信原理和方法,内容涉及java线程的众多常见重要知识点,学习后会对java多线程概念及线程间通信方式有直观清晰的了解和掌握,可以编写并分析简单的多线程程序。 进程与线程 ...
      本文学习并总结java多线程与线程间通信的原理和方法,内容涉及java线程的众多常见重要知识点,学习后会对java多线程概念及线程间通信方式有直观清晰的了解和掌握,可以编写并分析简单的多线程程序。

    进程与线程

    进程:是一个正在执行的程序。
    每一个进程执行都有执行顺序,一个执行顺序是一个执行路径,或者叫控制单元;
    每一个程序启动时,都会在内存中分配一片空间,进程就用于标识这片空间,并封装一个或若干控制单元。

    线程:就是进程中的一个独立的控制单元。
    线程控制进程的执行,一个进程至少有一个线程。

    java程序编译时,java编译器启动,对应javac.exe进程启动,编译结束后javac.exe进程退出;java程序运行时,jvm启动,对应java.exe进程启动,java.exe中有一个主线程负责java程序的执行,这个主线程运行的代码就存在于main方法中。其实jvm启动时,不止一个主线程,还有负责垃圾回收机制的线程。

    有多条执行路径的程序,称为多线程程序。多线程的好处是可以让程序的多个部分代码产生同时运行的效果,程序的多个功能分支并行执行,优化程序功能结构并提高效率。

    自定义创建线程的2种方法

    1. 继承Thread类具体步骤
        a) 自定义类,继承Thread;
        b) 复写Thread类的run()方法,run()方法中存储线程要运行的代码;
        c)  创建继承Thread的自定义类对象,调用线程的start()方法,start()方法的作用:启动线程,并自动调用run()方法。
    2. 实现Runnable接口具体步骤
        a) 定义类实现Runnable接口;
        b) 覆盖Runnable接口中的run()方法,run()中存放线程要运行的代码;
        c) 创建Thread类线程对象,并将Runnable接口的子类对象作为实参传递给Thread类构造函数;
        d) 调用Thread类对象的start()方法。
    2种方式的区别:
        实现方式时,线程代码存放在实现Runnable接口的子类的run()方法中,可以使用该子类创建多个Thread类,这样多个线程运行时可以共用Runnable子类中的成员变量,实现资源的独立共享。
        继承方式时,线程代码存放在Thread子类的run()方法中,而一个线程不能多次start(),所以达不到Thread子类中资源数据的共享使用。
        自定义线程时,建议使用实现Runnable接口的方式,因为这样还可以避免单继承的局限性。

    线程的几个零散知识点

    多线程运行结果的随机性:单核CPU环境,多个线程并非真正的同时运行,而是互相抢夺CPU的执行权限和资源,谁抢到谁执行,至于执行多长时间,CPU说了算(所以多线程程序的每次运行结果可能都不一样)(后续可以加以控制)。
    多核CPU环境,多个线程可以分布运行到多个CPU上,实现真正的同时运行。
        多核CPU环境上多个线程同时打印输出信息时,可能打印顺序混乱,这是因为多个CPU核抢占DOS输出屏是随机的,有的打印被临时阻塞。
       多核CPU时,程序运行效率就卡在了内在空间上,必须要有足够大的内存存储很多线程,才能让这些线程运行在多个CPU上。
    线程状态及状态间切换

    已start()过的线程不能再次start(), 否则会报异常java.lang.IllegalThreadStateException。
    线程的名称
    线程对象都有自己默认的名称:Thread-编号,编号从0开始。
    设置自定义线程名称,可以在子类构造函数中调用super(name), 也可以直接创建对象后调用setName()方法。
    Thread.currentThread(), 返回当前运行的线程对象,也就是this引用指向的对象。

    多线程安全问题

    当多条语句在操作多个线程共享数据时,一个线程对多条语句执行了一部分,还没执行完,另一个线程参与进来执行,会导致共享数据的错误。
    解决方法:对多条操作共享数据语句,只能让一个线程执行完,在执行过程中,其他线程不可以参与执行。
    java对多线程安全问题的专业解决方法就是同步synchronized,具体表现形式有同步代码块和同步函数。
    同步代码块

    [java]  view plain  copy
      在CODE上查看代码片 派生到我的代码片
    1. synchronized(对象) //括号中对象需手动指定,可以直接在Object对象  
    2. {  
    3.     需要被同步的代码  
    4. }  
    同步函数:将synchronized作为修饰符放在函数定义上,函数返回值类型前面。
    同步的原理
         同步代码块对象如同锁,持有锁的线程可以在同步语句中执行;没有持有锁的线程即使获得了CPU执行权,也进不去,无法执行同步代码。
         同步函数使用的锁是this对象; 静态同步函数使用的锁是该函数所在类对应的类字节码文件对象,即类名.class,该对象的类型是Class。
         synchronized修饰符不属于方法签名的一部分,当子类覆盖父类方法时,synchronized修饰符不会被继承,因此接口中方法不能被声明为synchronized,同样,构造函数也不能被声明为synchronized。
         线程进入同步代码块或同步函数前先判断锁标志位,若判断结果为真,则进入同步代码块或同步函数后,修改锁标志位为假,线程退出后,再恢复锁标志位为真。
    [java]  view plain  copy
      在CODE上查看代码片 派生到我的代码片
    1. /* 
    2. 简单的售票程序,多个窗口同时卖票 
    3. */  
    4. class Ticket implements Runnable{  
    5.     private int tick=100;  
    6.     Object obj=new Object();  
    7.     public void run(){  
    8.         while(true){  
    9.             synchronized(obj){ /*将操作共享成员数据tick的语句放到同步代码块中,用Object类对象锁住,这样不会出现卖0号票或负数票的情况*/  
    10.                 if(tick>0){  
    11.                     try{Thread.sleep(10);}catch(Exception e){e.printStackTrace();}  
    12.                     System.out.println(Thread.currentThread().getName()+"......sale : "+tick--);  
    13.                 }  
    14.             }  
    15.         }  
    16.     }  
    17. }  
    18. public class TicketDemo{  
    19.     public static void main(String[] args){  
    20.         Ticket t=new Ticket();  
    21.         Thread t1=new Thread(t);  
    22.         Thread t2=new Thread(t);  
    23.         Thread t3=new Thread(t);  
    24.         Thread t4=new Thread(t);  
    25.         t1.start();  
    26.         t2.start();  
    27.         t3.start();   
    28.         t4.start();          
    29.     }  
    30. }  

    同步的前提
    1. 必须要有2个或者2个以上的线程
    2. 必须是多个线程使用同一个锁,多个线程可以同时操作同一个锁下的代码。
    同步的弊端
    1. 线程每次进入同步代码块或同步函数都要判断锁,浪费资源,影响效率
    2. 可能出现死锁现象,多发生在一个同步代码块或同步函数中嵌套另一个同步函数或同步代码块,且2个同步上使用不同的锁。即同步中嵌套同步而锁不同就容易引发死锁。
    下面是一个很直观的死锁的例子,跟毕老师讲得MyLock的例子原理一样,只是形式上有差别:

    [java]  view plain  copy
      在CODE上查看代码片 派生到我的代码片
    1. class Zhangsan{        // 定义张三类  
    2.         public void say(){  
    3.                 System.out.println("张三对李四说:“你给我画,我就把书给你。”") ;  
    4.         }  
    5.         public void get(){  
    6.                 System.out.println("张三得到画了。") ;  
    7.         }  
    8. };  
    9. class Lisi{        // 定义李四类  
    10.         public void say(){  
    11.                 System.out.println("李四对张三说:“你给我书,我就把画给你”") ;  
    12.         }  
    13.         public void get(){  
    14.                 System.out.println("李四得到书了。") ;  
    15.         }  
    16. };  
    17. public class ThreadDeadLock implements Runnable{  
    18.         private static Zhangsan zs = new Zhangsan() ;                // 实例化static型对象  
    19.         private static Lisi ls = new Lisi() ;                // 实例化static型对象  
    20.         private boolean flag = false ;        // 声明标志位,判断那个先说话  
    21.         public void run(){        // 覆写run()方法  
    22.                 if(flag){  
    23.                         synchronized(zs){        // 同步张三  
    24.                                 zs.say() ;  
    25.                                 try{  
    26.                                         Thread.sleep(500) ;  
    27.                                 }catch(InterruptedException e){  
    28.                                         e.printStackTrace() ;  
    29.                                 }  
    30.                                 synchronized(ls){  
    31.                                         zs.get() ;  
    32.                                 }  
    33.                         }  
    34.                 }else{  
    35.                         synchronized(ls){  
    36.                                 ls.say() ;  
    37.                                 try{  
    38.                                         Thread.sleep(500) ;  
    39.                                 }catch(InterruptedException e){  
    40.                                         e.printStackTrace() ;  
    41.                                 }  
    42.                                 synchronized(zs){  
    43.                                         ls.get() ;  
    44.                                 }  
    45.                         }  
    46.                 }  
    47.         }  
    48.         public static void main(String args[]){  
    49.                 ThreadDeadLock t1 = new ThreadDeadLock() ;                // 控制张三  
    50.                 ThreadDeadLock t2 = new ThreadDeadLock() ;                // 控制李四  
    51.                 t1.flag = true ;  
    52.                 t2.flag = false ;  
    53.                 Thread thA = new Thread(t1) ;  
    54.                 Thread thB = new Thread(t2) ;  
    55.                 thA.start() ;  
    56.                 thB.start() ;  
    57.         }  
    58. };  
    59. 运行结果:  
    60. 张三对李四说:“你给我画,我就把书给你。”  
    61. 李四对张三说:“你给我书,我就把画给你”    
    62. //双方僵持在这,谁都没法继续运行  

    线程间通讯

    Object类方法wait(),notify(),notifyAll()
          线程执行wait()后,就放弃了运行资格,处于冻结状态;线程运行时,内存中会建立一个线程池,冻结状态的线程都存在于线程池中,notify()执行时唤醒的也是线程池中的线程,线程池中有多个线程时唤醒第一个被冻结的线程。
          notifyall(), 唤醒线程池中所有线程。
          wait(), notify(),notifyall()都用在同步里面,因为这3个函数是对持有锁的线程进行操作,而只有同步才有锁,所以要使用在同步中。
          wait(),notify(),notifyall(),  在使用时必须标识它们所操作的线程持有的锁,因为等待和唤醒必须是同一锁下的线程;而锁可以是任意对象,所以这3个方法都是Object类中的方法。

    wait和sleep区别:从执行权和锁上来分析这2个方法
    wait():可以指定时间也可以不指定时间,不指定时间时,只能由对应的notify()或notifyAll()来唤醒。
    sleep():必须指定时间,时间到自动从冻结状态转入运行状态或临时阻塞状态。
    wait():线程会释放执行权,并释放锁。
    sleep():线程会释放执行权,但是并不释放锁。

    单个消费者生产者例子:

    [java]  view plain  copy
      在CODE上查看代码片 派生到我的代码片
    1. class Resource{  //生产者和消费者都要操作的资源  
    2.     private String name;  
    3.     private int count=1;  
    4.     private boolean flag=false;  
    5.     public synchronized void set(String name){  
    6.         if(flag)  
    7.             try{wait();}catch(Exception e){}  
    8.         this.name=name+"---"+count++;  
    9.         System.out.println(Thread.currentThread().getName()+"...生产者..."+this.name);  
    10.         flag=true;  
    11.         this.notify();  
    12.     }  
    13.     public synchronized void out(){  
    14.         if(!flag)  
    15.             try{wait();}catch(Exception e){}  
    16.         System.out.println(Thread.currentThread().getName()+"...消费者..."+this.name);  
    17.         flag=false;  
    18.         this.notify();  
    19.     }  
    20. }  
    21. class Producer implements Runnable{  
    22.     private Resource res;  
    23.     Producer(Resource res){  
    24.         this.res=res;  
    25.     }  
    26.     public void run(){  
    27.         while(true){  
    28.             res.set("商品");  
    29.         }  
    30.     }  
    31. }  
    32. class Consumer implements Runnable{  
    33.     private Resource res;  
    34.     Consumer(Resource res){  
    35.         this.res=res;  
    36.     }  
    37.     public void run(){  
    38.         while(true){  
    39.             res.out();  
    40.         }  
    41.     }  
    42. }  
    43. public class ProducerConsumerDemo{  
    44.     public static void main(String[] args){  
    45.         Resource r=new Resource();  
    46.         Producer pro=new Producer(r);  
    47.         Consumer con=new Consumer(r);  
    48.         Thread t1=new Thread(pro);  
    49.         Thread t2=new Thread(con);  
    50.         t1.start();  
    51.         t2.start();  
    52.     }  
    53. }//运行结果正常,生产者生产一个商品,紧接着消费者消费一个商品。  

          但是如果有多个生产者和多个消费者,上面的代码是有问题,比如2个生产者,2个消费者,运行结果就可能出现生产的1个商品生产了一次而被消费了2次,或者连续生产2个商品而只有1个被消费,这是因为此时共有4个线程在操作Resource对象r,  而notify()唤醒的是线程池中第1个wait()的线程,所以生产者执行notify()时,唤醒的线程有可能是另1个生产者线程,这个生产者线程从wait()中醒来后不会再判断flag,而是直接向下运行打印出一个新的商品,这样就出现了连续生产2个商品。
    为了避免这种情况,修改代码如下:

    [java]  view plain  copy
      在CODE上查看代码片 派生到我的代码片
    1. class Resource{  
    2.     private String name;  
    3.     private int count=1;  
    4.     private boolean flag=false;  
    5.     public synchronized void set(String name){  
    6.         while(flag) /*原先是if,现在改成while,这样生产者线程从冻结状态醒来时,还会再判断flag.*/  
    7.             try{wait();}catch(Exception e){}  
    8.         this.name=name+"---"+count++;  
    9.         System.out.println(Thread.currentThread().getName()+"...生产者..."+this.name);  
    10.         flag=true;  
    11.         this.notifyAll();/*原先是notity(), 现在改成notifyAll(),这样生产者线程生产完一个商品后可以将等待中的消费者线程唤醒,否则只将上面改成while后,可能出现所有生产者和消费者都在wait()的情况。*/  
    12.     }  
    13.     public synchronized void out(){  
    14.         while(!flag) /*原先是if,现在改成while,这样消费者线程从冻结状态醒来时,还会再判断flag.*/  
    15.             try{wait();}catch(Exception e){}  
    16.         System.out.println(Thread.currentThread().getName()+"...消费者..."+this.name);  
    17.         flag=false;  
    18.         this.notifyAll(); /*原先是notity(), 现在改成notifyAll(),这样消费者线程消费完一个商品后可以将等待中的生产者线程唤醒,否则只将上面改成while后,可能出现所有生产者和消费者都在wait()的情况。*/  
    19.     }  
    20. }  
    21. public class ProducerConsumerDemo{  
    22.     public static void main(String[] args){  
    23.         Resource r=new Resource();  
    24.         Producer pro=new Producer(r);  
    25.         Consumer con=new Consumer(r);  
    26.         Thread t1=new Thread(pro);  
    27.         Thread t2=new Thread(con);  
    28.         Thread t3=new Thread(pro);  
    29.         Thread t4=new Thread(con);  
    30.         t1.start();  
    31.         t2.start();  
    32.         t3.start();  
    33.         t4.start();  
    34.     }  
    35. }  

    jdk1.5中,提供了多线程的升级解决方案:将同步synchronized替换为显式的Lock操作,将Object类中的wait(), notify(),notifyAll()替换成了Condition对象,该对象可以通过Lock锁对象获取; 一个Lock对象上可以绑定多个Condition对象,这样实现了本方线程只唤醒对方线程,而jdk1.5之前,一个同步只能有一个锁,不同的同步只能用锁来区分,且锁嵌套时容易死锁。

    [java]  view plain  copy
      在CODE上查看代码片 派生到我的代码片
    1. class Resource{  
    2.     private String name;  
    3.     private int count=1;  
    4.     private boolean flag=false;  
    5.     private Lock lock = new ReentrantLock();/*Lock是一个接口,ReentrantLock是该接口的一个直接子类。*/  
    6.     private Condition condition_pro=lock.newCondition(); /*创建代表生产者方面的Condition对象*/  
    7.     private Condition condition_con=lock.newCondition(); /*使用同一个锁,创建代表消费者方面的Condition对象*/  
    8.       
    9.     public void set(String name){  
    10.         lock.lock();//锁住此语句与lock.unlock()之间的代码  
    11.         try{  
    12.             while(flag)  
    13.                 condition_pro.await(); //生产者线程在conndition_pro对象上等待  
    14.             this.name=name+"---"+count++;  
    15.             System.out.println(Thread.currentThread().getName()+"...生产者..."+this.name);  
    16.             flag=true;  
    17.              condition_con.signalAll();/*signalAll()是唤醒线程池中的所有线程,而指明调用对象是condition_con后是唤醒所有在condition_conn这个对象上等待的所有线程*/  
    18.         }  
    19.         finally{  
    20.             lock.unlock(); //unlock()要放在finally块中。  
    21.         }  
    22.     }  
    23.     public void out(){  
    24.         lock.lock(); //锁住此语句与lock.unlock()之间的代码  
    25.         try{  
    26.             while(!flag)  
    27.                 condition_con.await(); //消费者线程在conndition_con对象上等待  
    28.         System.out.println(Thread.currentThread().getName()+"...消费者..."+this.name);  
    29.         flag=false;  
    30.         condition_pro.signqlAll(); /*唤醒所有在condition_pro对象下等待的线程,也就是唤醒所有生产者线程*/  
    31.         }  
    32.         finally{  
    33.             lock.unlock();  
    34.         }  
    35.     }  
    36. }  

    线程通信的其他几个常用方法:

    终止线程
    jdk1.5起,stop()方法(非静态)已过时,不能再使用(否则会报错),终止线程的唯一方法是run()方法结束。
    开启多线程运行时,运行代码通过是循环结构,只要控制住循环,就可以让run()方法结束。
    中断线程
    interrupt()方法,如果线程在调用Object类的 wait()、wait(long) 或wait(long,int) 方法,或者该类的 join()、join(long)、join(long,int)、sleep(long) 或sleep(long,int) 方法过程中受阻,则其中断状态将被清除,它还将收到一个 InterruptedException。  
    线程的中断状态即冻结状态,interrupt()是将处于冻结状态的线程强制地恢复到运行状态。  
    守护线程
    setDaemon(), 将线程设置为守护线程,当正在运行的所有线程都是守护线程时,jvm自动退出。意思差不多是:前台线程(如main线程)结束后,后台线程(如t1,t2)也自动结束。
    setDaemon()方法必须在启动线程前调用。下面是interrupt()和setDeamon()方法的一个示例。

    [java]  view plain  copy
      在CODE上查看代码片 派生到我的代码片
    1. class StopThread implements Runnable{  
    2.     private boolean flag=true;  
    3.     public synchronized void run(){  
    4.         while(flag){  
    5.             try{  
    6.                 wait(); /*t1或t2线程处于wait()冻结状态时,即使主线程中修改了flag的值,t1和t2都 不能再判断上面while循环的终止条件,会导致2个线程一直在wait()中动不了,所以需要将t1和t2人为的唤醒*/  
    7.             }  
    8.             catch(InterruptedException e){  
    9.                 System.out.println(Thread.currentThread().getName()+"...InterruptedException");  
    10.                 flag=false;//一时接收到了InterruptedException异常,说明线程已被恢复到运行状态,这时再手动设置flag为false,让线程再次判断while循环时不再再次等待*/  
    11.             }  
    12.             System.out.println(Thread.currentThread().getName()+"...run");  
    13.         }  
    14.     }  
    15.     public void changeFlag(){  
    16.         flag=false;  
    17.     }  
    18. }  
    19. public class StopTreadDemo {  
    20.     public static void main(String[] args) {  
    21.         StopThread st=new StopThread();  
    22.         Thread t1=new Thread(st);  
    23.         Thread t2=new Thread(st);  
    24.         //t1.setDaemon(true);这2句语句执行后,t1,t2不再调用interrupt(),也能让整个程序结束,因为该程序就3个线程,main线程结束后守护线程也会随之终止。  
    25.         //t2.setDaemon(true);  
    26.         t1.start();  
    27.         t2.start();  
    28.         int num=0;  
    29.         while(true){  
    30.             if(num++==60){  
    31.                 //st.changFlag();  
    32.                 t1.interrupt(); //将线程t1的冻结状态 清除,让其处于运行  
    33.                 t2.interrupt(); //将线程t1的冻结状态 清除,让其处于运行  
    34.                 break;  
    35.             }  
    36.             System.out.println(Thread.currentThread().getName()+"..."+num);  
    37.         }  
    38.         System.out.println("over");  
    39.     }  
    40.   
    41. }  

    join()方法
    当A线程执行到了B线程的join()方法时,A就放弃运行资格,处于冻结等待状态,等B线程执行完,A才恢复运行资格;如果B线程执行过程中挂掉,那需要用interrupt()方法来清理A线程的冻结状态;join()可以用来临时加入线程执行。
    toString()方法
    返回线程名称、优先级和线程组字符串。
    默认情况下,哪个线程启动了线程t1, t1就属于哪个线程组,也可创建新的ThreadGroup对象;所有方法,包括main(),线程优先级默认是5;Thread.MAX_PRORITY为10,Thread.MIN_PROTITY为1,NOR_PRORITY为5.
    yield()方法
    暂时释放执行资格,稍微减缓线程切换的频率,让多个线程得到运行资格的机会均等一些。

    展开全文
  • JAVA线程通信的几种方式

    万次阅读 多人点赞 2017-08-12 21:55:12
    “编写两个线程,一个线程打印1~25,另一个线程打印字母A~Z,打印顺序为12A34B56C……5152Z,要求使用线程间的通信。” 这是一道非常好的面试题,非常能彰显被面者关于多线程的功力,一下子就勾起了我的兴趣。这里...
  • 主要介绍了Java Socket实现多线程通信功能,结合具体实例形式较为详细的分析了java线程通信原理及客户端、服务器端相应实现技巧,需要的朋友可以参考下
  • java线程通信、线程池

    千次阅读 2019-08-22 19:38:34
    文章目录第四章:线程状态4.1 线程状态概述4.2 Waiting (无限等待)和线程通信4.3 线程通信代码实战第五章:线程池5.1 线程池5.2 线程池的使用 第四章:线程状态 4.1 线程状态概述 当线程被创建并启动以后,它既...
  • 主要介绍了Java Socket实现单线程通信的方法,结合具体实例形式分析了java socket单线程通信原理与客户端、服务器端相关实现技巧,需要的朋友可以参考下
  • Java线程同步和通信

    千次阅读 2019-12-07 10:15:47
    线程同步 回顾 1 进程:正在运行的程序,操作系统通过进程Id区分不同进程。 2 线程:进程中的一条执行路径。一个进程中可以包含多个线程,至少有一个。 3 区别: a.一个程序运行后至少有一个进程 b.一个进程可以...
  • Java中的多线程线程通信

    万次阅读 2018-09-23 10:22:42
    线程通信: /学习笔记/ 多个线程在处理同一资源,但是任务却不同。 先看一个例子,采用两个线程执行进行输入和输出任务: //资源 class Resource { String name; String sex; } //输入 ...
  • Java局域网通信——飞鸽传书源代码 28个目标文件 内容索引:JAVA源码,媒体网络,飞鸽传书 Java局域网通信——飞鸽传书源代码,大家都知道VB版、VC版还有Delphi版的飞鸽传书软件,但是Java版的确实不多,因此这个Java...
  • Java基础知识面试题(2020最新版)

    万次阅读 多人点赞 2020-02-19 12:11:27
    原理是什么Java语言有哪些特点什么是字节码?采用字节码的最大好处是什么什么是Java程序的主类?应用程序和小程序的主类有何不同?Java应用程序与小程序之间有那些差别?Java和C++的区别Oracle JDK 和 OpenJDK 的...
  • 最近开始学习Java线程相关的知识了,想要基础入门的话推荐读《Java线程编程核心技术》,内容偏实战,想要深入理解多线程的话推荐读《Java并发编程的艺术》和《Java并发编程实战》,最近的话在看《Java并发编程的...
  • java通信一:Socket通信原理简单理解

    万次阅读 多人点赞 2017-02-04 10:07:01
    在接触Java之前,本人曾对即时通讯工具非常感兴趣。现在是网络时代,网络间通讯已经成为了每个人生活的一部分,有鉴于此,程序员在这方面...今天就来说一下socket通信的基本原理。 TCP/IP 要想理解socket首先得熟
  • Java面试题大全(2020版)

    万次阅读 多人点赞 2019-11-26 11:59:06
    发现网上很多Java面试题都没有答案,所以花了很长时间搜集整理出来了这套Java面试题大全,希望对大家有帮助哈~ 本套Java面试题大全,全的不能再全,哈哈~ 一、Java 基础 1. JDK 和 JRE 有什么区别? JDK:Java ...
  • Java线程通信

    千次阅读 2020-05-23 00:18:32
    什么是线程通信 线程通信其实就是多个线程在操作同一个资源,但是操作的动作不同 Demo前奏 两个线程操作同一对象,一个线程负责给对象切换赋值一次,另一个线程负责打印一次对象值 package com.star.test; ...
  • Java线程安全原理

    千次阅读 2017-09-08 15:27:57
    从[深入理解Java虚拟机 ],[Java并发编程的艺术]这两本书里学到了很多知识。 在学习的过程中,也总结下对多线程的理解。JVM的内存区域解释这个问题之前,先看下JVM(java虚拟机)运行时的数据区域的划分: 图中的...
  • 一客户一线程通信

    2018-09-11 15:46:07
    NetBeans工程,利用Java多线程实现客户机服务器,一客户一线程通信,通信协议为简单的Echo协议。
  • Java线程通信

    万次阅读 2019-08-21 23:26:20
    其实把ABC当做线程,这便是一个线程通信的例子。 wait(); 使执行的线程进行等待 notify(); 使停止的线程继续运行, join(); 使所属线程正常执行run方法,而当前线程暂时阻塞,有排队的作用 notify与no...
  • java线程安全

    千次阅读 2018-12-12 16:03:33
    线程不安全产生的主要原因:因为多个线程共享一个内存,所以当多个线程共享一个全局变量的时候,可能会受到其他干扰。 如线程更新会先在本地内存更新,然后再同步到共享内存中,当多个线程同时读写的时候,数据会...
  • 线程池为线程生命周期开销问题和资源不足问题提供了解决方案,因为线程是稀缺资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,合理的使用线程池对线程进行统一分配、调优和监控; 多线程技术...
  • 第35节线程之间通信之join应用与实现原理剖析00:10:17分钟 | 第36节ThreadLocal 使用及实现原理00:17:41分钟 | 第37节并发工具类CountDownLatch详解00:22:04分钟 | 第38节并发工具类CyclicBarrier 详解00:11:52...
  • 进程之间通信方式 (1) 管道(PIPE) (2) 命名管道(FIFO) ...Java如何支持进程间通信。我们把Java进程理解为JVM进程。很明显,传统的这些大部分技术是无法被我们的应用程序利用了(这些进程间通信都...
  • 简述: ...OK,带着这个问题我们来学习本节内容---线程通信原理。  本篇博文主要讲解:  ① 什么是可见性、原子性、有序性。  ② JMM模型。 JMM简介:  JMM:Java Memory Model(Ja
  • Java 并发:线程通信与协作

    万次阅读 多人点赞 2017-02-08 17:43:18
    线程线程之间不是相互独立的个体,它们彼此之间需要相互通信和协作,最典型的例子就是生产者-消费者问题。本文首先介绍 wait/notify 机制,并对实现该机制的两种方式——synchronized+wait-notify模式和Lock+...
  • 线程通信原理

    千次阅读 2015-03-29 11:08:45
    从操作系统的角度讲,线程通信比进程间通信要容易的多,因为线程之间可以共享进程的内存空间。因此,他们可以共享位于进程全局数据区和栈和堆上的所有内容。 唯一只属于某个线程的就是线程的栈-------它可以存放...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 106,937
精华内容 42,774
关键字:

java线程通信原理

java 订阅