精华内容
下载资源
问答
  • 通过多线程的串行化允许线程对共享资源的访问,速度快 (2)互斥量 只有拥有互斥对象的线程才能对资源空间进行访问,因为互斥对象只有一个,所以可以保证公共资源不被多个线程访问 (3)信号量 用于控制多个线程...

    1 基本概括

    2 主要介绍

    2.1 进程通信和线程通信的概念

    进程通信

    进程相互交换数据与信息。进程间通信有两种基本模型:共享内存和消息传递(消息队列)。

    线程通信

    原因:为了更好地协作,线程无论是交替式执行,还是接力式执行,都需要进行通信告知。

    线程间通信:

    (1)临界区

    通过多线程的串行化允许线程对共享资源的访问,速度快

    (2)互斥量

    只有拥有互斥对象的线程才能对资源空间进行访问,因为互斥对象只有一个,所以可以保证公共资源不被多个线程访问

    (3)信号量

    用于控制多个线程对共享空间资源的访问,一般会限制同一时刻访问资源的最大线程数

    (4)信号

    通过通知操作的方式来控制线程间的同步,可以区分线程间的优先级

    2.2 线程通信

    2.2.1线程通信方式

    共享内存:

    共享内存这种方式比较常见,我们经常会设置一个共享变量。然后多个线程去操作同一个共享变量。从而达到线程通讯的目的。例如,我们使用多个线程去执行页面抓取任务,我们可以使用一个共享变量count来记录任务完成的数量。每当一个线程完成抓取任务,会在原来的count上执行加1操作。这样每个线程都可以通过获取这个count变量来获得当前任务的完成情况。当然必须要考虑的是共享变量的同步问题,这也共享内存容易出错的原因所在。

    这种通讯模型中,不同的线程之间是没有直接联系的。都是通过共享变量这个“中间人”来进行交互。而这个“中间人”必要情况下还需被保护在临界区内(加锁或同步)。由此可见,一旦共享变量变得多起来,并且涉及到多种不同线程对象的交互,这种管理会变得非常复杂,极容易出现死锁等问题。

    消息传递:

    消息传递方式采取的是线程之间的直接通信,不同的线程之间通过显式的发送消息来达到交互目的。消息传递最有名的方式应该是actor模型了。在这种模型下,一切都是actor,所有的actor之间的通信都必须通过传递消息才能达到。每个actor都有一个收件箱(消息队列)用来保存收到其他actor传递来的消息。actor自己也可以给自己发送消息。这才是面向对象的精髓啊!

    这种模型看起来比共享内存模型要复杂。但是一旦碰到复杂业务的话,actor模型的优势就体现出来了。

    首先我们定义一个统计actor用来统计任务完成量。然后把多个网址(消息方式)发给多个抓取actor,抓取actor处理完任务后发送消息通知统计actor任务完成,统计actor对自己保存的变量count(这个只有统计actor才能看到)加一。

    2.2.2 线程通信的具体方式

    1.volatile关键字方式

    volatile有两大特性,一是可见性,二是有序性,禁止指令重排序,其中可见性就是可以让线程之间进行通信。

    volatile语义保证线程可见性有两个原则保证

    所有volatile修饰的变量一旦被某个线程更改,必须立即刷新到主内存

    所有volatile修饰的变量在使用之前必须重新读取主内存的值

    volatile保证可见性原理图

    如果将volatile关键字去掉,线程切换一定次数后将不能感知到flag的变化,最开始能感知是线程启动时间差的原因。

    2.等待/通知机制

    等待通知机制是基于wait和notify方法来实现的,在一个线程内调用该线程锁对象的wait方法,线程将进入等待队列进行等待直到被通知或者被唤醒。

    为什么要必须获取锁?

    因为调用wait方法时,必须要先释放锁,如果没有持有锁将会抛出异常。

    wait()方法和notify()方法和notifyAll()方法

    1 .wait()方法 语义:使得当前线程立刻停止运行,处于等待状态(WAIT),并将当前线程置入锁对象的等待队列中,直到被通知(notify)或被中断为止。 使用条件:wait方法只能在同步方法或同步代码块中使用...

    2.notify()方法 语义:唤醒处于等待状态的线程 使用条件:notify()也必须在同步方法或同步代码块中调用,用来唤醒等待该...

    3.notifyAll()方法 唤醒所有处于等待状态的线程

    3.Join 方法

    方法join()的作用是等待线程销毁,方法join具有使线程排队的作用,有些类似同步的运行效果,方法join的作用是使所属的线程对象x正常执行run方法的任务,而使当前线程z进行无限期的阻塞,等待线程

    x执行完毕后再执行线程后面的代码。join与sychronized关键字的区别:jion在内部使用wait方法进行等待,而sychronized使用的是“对象监视器原理”作为同步。

    join方法的源码

      public final synchronized void join(long millis)
        throws InterruptedException {
            long base = System.currentTimeMillis();
            long now = 0;
     
            if (millis < 0) {
                throw new IllegalArgumentException("timeout value is negative");
            }
     
            if (millis == 0) {
                while (isAlive()) {
                    wait(0);
                }
            } else {
                while (isAlive()) {
                    long delay = millis - now;
                    if (delay <= 0) {
                        break;
                    }
                    wait(delay);
                    now = System.currentTimeMillis() - base;
                }
            }
      }

    4.管道

    管道流是JAVA中线程通讯的常用方式之一,基本流程如下:

    1)创建管道输出流PipedOutputStream pos和管道输入流PipedInputStream pis

    2)将pos和pis匹配,pos.connect(pis);

    3)将pos赋给信息输入线程,pis赋给信息获取线程,就可以实现线程间的通讯了

    5.threadLocal方式

    threadLocal方式的线程通信,不像以上四种方式是多个线程之间的通信,它更像是一个线程内部的通信,将当前线程和一个map绑定,在当前线程内可以任意存取数据,减省了方法调用间参数的传递。

    2.3 进程通信

    2.3.1进程通信分类

    低级通信

    由于进程的互斥和同步,需要在进程间交换一定的信息,故不少学者将它们也归为进程通信。只能传递状态和整数值(控制信息)。

    特点:传送信息量小,效率低,每次通信传递的信息量固定,若传递较多信息则需要进行多次通信。

    编程复杂:用户直接实现通信的细节,容易出现。

    高级通信

    提高信号通信的效率,传递大量数据,减轻程序编制的复杂度。

    提供三种方式:

    1.共享内存模式

    2.消息传递模式

    3.共享文件模式

    2.3.2 进程通信方式

    管道

    (1)管道(Pipe):管道可用于具有亲缘关系进程间的通信,允许一个进程和另一个与它有共同祖先的进程之间进行通信。

    (2)命名管道(named pipe):命名管道克服了管道没有名字的限制,因此,除具有管道所具有的功能外,它还允许无亲缘关系进程间的通信。命名管道在文件系统中有对应的文件名。命名管道通过命令mkfifo或系统调用mkfifo来创建。

    系统IPC

    (3)信号(Signal):信号是比较复杂的通信方式,用于通知接受进程有某种事件发生,除了用于进程间通信外,进程还可以发送 信号给进程本身;linux除了支持Unix早期信号语义函数sigal外,还支持语义符合Posix.1标准的信号函数sigaction(实际上,该函数是基于BSD的,BSD为了实现可靠信号机制,又能够统一对外接口,用sigaction函数重新实现了signal函数)。

    (4)消息(Message)队列:消息队列是消息的链接表,包括Posix消息队列system V消息队列。有足够权限的进程可以向队列中添加消息,被赋予读权限的进程则可以读走队列中的消息。消息队列克服了信号承载信息量少,管道只能承载无格式字节流以及缓冲区大小受限等缺

    (5)共享内存:使得多个进程可以访问同一块内存空间,是最快的可用IPC形式。是针对其他通信机制运行效率较低而设计的。往往与其它通信机制,如信号量结合使用,来达到进程间的同步及互斥。

    (6)内存映射(mapped memory):内存映射允许任何多个进程间通信,每一个使用该机制的进程通过把一个共享的文件映射到自己的进程地址空间来实现它。

    (7)信号量(semaphore):主要作为进程间以及同一进程不同线程之间的同步手段。

    套接口

    (8)套接口(Socket):更为一般的进程间通信机制,可用于不同机器之间的进程间通信。起初是由Unix系统的BSD分支开发出来的,但现在一般可以移植到其它类Unix系统上:Linux和System V的变种都支持套接字。

    2.3.3 进程通信方式的优缺点

    效率对比:

    管道

    a、较早的一种通信方式,缺点明显:只能用于有亲缘关系进程之间的通信;只支持单向数据流,如果要双向通信需要多创建一个管道来实现。

    b、自身具备同步机制。

    c、随进程持续。

    信号

    a、这种通信可携带的信息极少。不适合需要经常携带数据的通信。

    b、不具备同步机制,类似于中断,什么时候产生信号,进程是不知道的。

    消息队列

    a、与共享内存和FIFO类似,使用一个路径名来实现各个无亲缘关系进程之间的通信。消息队列相比于其他方式有很多优点:它提供有格式的字节流,减少了开发人员的工作量;消息具有类型(system V)或优先级(posix)。其他方式都没有这些优点。

    b、具备同步机制。

    c、随内核持续。

    共享内存

    a、最快的一种通信方式,多个进程可同时访问同一片内存空间,相对其他方式来说具有更少的数据拷贝,效率较高。

    b、需要结合信号灯或其他方式来实现多个进程间同步,自身不具备同步机制。

    c、随内核持续,相比于随进程持续生命力更强。

    FIFO

    a、是有名管道,所以支持没有亲缘关系的进程通信。和共享内存类似,提供一个路径名字将各个无亲缘关系的进程关联起来。但是也需要创建两个描述符来实现双向通信。

    b、自身具备同步机制。

    c、随进程持续。

    socket

    a、使用socket通信的方式实现起来简单,可以使用因特网域和UNIX域来实现,使用因特网域可以实现不同主机之间的进出通信。

    b、该方式自身携带同步机制,不需要额外的方式来辅助实现同步。

    c、随进程持续。

    3 实现方式

    3.1 volatile实现多线程通信

    public class VolatileDemo {
        private static volatile boolean flag = true;
        public static void main(String[] args) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true){
                        if (flag){
                            System.out.println("trun on");
                            flag = false;
                        }
                    }
                }
            }).start();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true){
                        if (!flag){
                            System.out.println("trun off");
                            flag = true;
                        }
                    }
                }
            }).start();
        }
    }

    3.2 等待/通知机制(wait/notify机制)

    public class WaitDemo {
        private static Object lock = new Object();
        private static  boolean flag = true;
        public static void main(String[] args) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    synchronized (lock){
                        while (flag){
                            try {
                                System.out.println("wait start .......");
                                lock.wait();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
    
                        System.out.println("wait end ....... ");
                    }
                }
            }).start();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    if (flag){
                        synchronized (lock){
                            if (flag){
                                lock.notify();
                                System.out.println("notify .......");
                                flag = false;
                            }
    
                        }
                    }
                }
            }).start();
        }
    }

    3.3 join方法

    /**
    * Join方法就是挂起调用线程,直到被调用线程执行完毕后再继续执行。例:threadB线程中threadA的join方法,
    * 所以threadB需在threadA执行完毕后才继续执行join后的代码,而主线程执行threadB.join(),所以最终主线程需等threadA和threadB执行完毕后才继续。
    *
    */
    public class JoinThread {
    
        public static void join() throws InterruptedException {
            long startTime = System.currentTimeMillis();
            Thread threadA = new Thread(() -> {
                try {
                    log.info("threadA start");
                    Thread.sleep(4000);
                    log.info("threadA end");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
    
            Thread threadB = new Thread(() -> {
                try {
                    threadA.join();
                    log.info("threadB start");
                    Thread.sleep(3000);
                    log.info("threadB end");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
    
            threadA.start();
            threadB.start();
    
    
            threadB.join();
            log.info("run time [{}]", startTime - System.currentTimeMillis());
            log.info("main thread end");
        }
    }

    3.4 管道

    
    public class testPipeConnection {
    
      public static void main(String[] args) {
        /**
         * 创建管道输出流
         */
        PipedOutputStream pos = new PipedOutputStream();
        /**
         * 创建管道输入流
         */
        PipedInputStream pis = new PipedInputStream();
        try {
          /**
           * 将管道输入流与输出流连接 此过程也可通过重载的构造函数来实现
           */
          pos.connect(pis);
        } catch (IOException e) {
          e.printStackTrace();
        }
        /**
         * 创建生产者线程
         */
        Producer p = new Producer(pos);
        /**
         * 创建消费者线程
         */
        Consumer1 c1 = new Consumer1(pis);
        /**
         * 启动线程
         */
        p.start();
        c1.start();
      }
    }
    
    /**
     * 生产者线程(与一个管道输入流相关联)
     * 
     */
    class Producer extends Thread {
      private PipedOutputStream pos;
    
      public Producer(PipedOutputStream pos) {
        this.pos = pos;
      }
    
      public void run() {
        int i = 0;
        try {
          while(true)
          {
          this.sleep(3000);
          pos.write(i);
          i++;
          }
        } catch (Exception e) {
          e.printStackTrace();
        }
      }
    }
    
    /**
     * 消费者线程(与一个管道输入流相关联)
     * 
     */
    class Consumer1 extends Thread {
      private PipedInputStream pis;
    
      public Consumer1(PipedInputStream pis) {
        this.pis = pis;
      }
    
      public void run() {
        try {
          while(true)
          {
          System.out.println("consumer1:"+pis.read());
          }
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    }

    4 常见问题

    1 进程间通信的方式有哪些?线程间通讯方式有哪些?
    2 join方法的原理?
    3 notify和notifyall有什么区别?
    4 为什么wait方法要写在while循环里面而不是if呢?
    5 在 Java 的并发编程中,什么是等待-通知机制?它是怎么实现的?

    常见出现的问题会在后面的文章讨论,一起学习的朋友可以点点关注,会持续更新,文章有帮助的话可以长按点赞有惊喜收藏转发,有什么补充可以在下面评论,谢谢

    展开全文
  • JAVA多线程之线程间的通信方式解析一,介绍本总结我对于JAVA多线程中线程之间的通信方式的理解,主要以代码结合文字的方式来讨论线程间的通信,故摘抄了书中的一些示例代码。二,线程间的.通信方式①同步这里讲的...

    JAVA多线程之线程间的通信方式解析

    一,介绍

    本总结我对于JAVA多线程中线程之间的通信方式的理解,主要以代码结合文字的方式来讨论线程间的通信,故摘抄了书中的一些示例代码。

    二,线程间的.通信方式

    ①同步

    这里讲的同步是指多个线程通过synchronized关键字这种方式来实现线程间的通信。

    参考示例:

    由于线程A和线程B持有同一个MyObject类的对象object,尽管这两个线程需要调用不同的方法,但是它们是同步执行的,比如:线程B需要等待线程A执行完了methodA()方法之后,它才能执行methodB()方法。这样,线程A和线程B就实现了 通信。

    这种方式,本质上就是“共享内存”式的通信。多个线程需要访问同一个共享变量,谁拿到了锁(获得了访问权限),谁就可以执行。

    ②while轮询的方式

    代码如下:

    ?

    在这种方式下,线程A不断地改变条件,线程ThreadB不停地通过while语句检测这个条件(list.size()==5)是否成立 ,从而实现了线程间的通信。但是这种方式会浪费CPU资源。之所以说它浪费资源,是因为JVM调度器将CPU交给线程B执行时,它没做啥“有用”的工作,只是在不断地测试 某个条件是否成立。就类似于现实生活中,某个人一直看着手机屏幕是否有电话来了,而不是: 在干别的事情,当有电话来时,响铃通知TA电话来了。

    ③wait/notify机制

    代码如下:

    ?

    线程A要等待某个条件满足时(list.size()==5),才执行操作。线程B则向list中添加元素,改变list 的size。

    A,B之间如何通信的呢?也就是说,线程A如何知道 list.size() 已经为5了呢?

    这里用到了Object类的 wait() 和 notify() 方法。

    当条件未满足时(list.size() !=5),线程A调用wait() 放弃CPU,并进入阻塞状态。---不像②while轮询那样占用CPU

    当条件满足时,线程B调用 notify()通知 线程A,所谓通知线程A,就是唤醒线程A,并让它进入可运行状态。

    这种方式的一个好处就是CPU的利用率提高了。

    但是也有一些缺点:比如,线程B先执行,一下子添加了5个元素并调用了notify()发送了通知,而此时线程A还执行;当线程A执行并调用wait()时,那它永远就不可能被唤醒了。因为,线程B已经发了通知了,以后不再发通知了。这说明:通知过早,会打乱程序的执行逻辑。

    【JAVA多线程之线程间的通信方式解析】相关文章:

    展开全文
  • 本总结我对于JAVA多线程中线程之间的通信方式的理解,主要以代码结合文字的方式来讨论线程间的通信,故摘抄了书中的一些示例代码。 二,线程间的通信方式 ①同步 这里讲的同步是指多个线程通过synchronized关键字...

    一,介绍

    本总结我对于JAVA多线程中线程之间的通信方式的理解,主要以代码结合文字的方式来讨论线程间的通信,故摘抄了书中的一些示例代码。

    二,线程间的通信方式

    ①同步

    这里讲的同步是指多个线程通过synchronized关键字这种方式来实现线程间的通信。

    参考示例:

    public class MyObject {
    
        synchronized public void methodA() {
            //do something....
        }
    
        synchronized public void methodB() {
            //do some other thing
        }
    }
    
    public class ThreadA extends Thread {
    
        private MyObject object;
    //省略构造方法
        @Override
        public void run() {
            super.run();
            object.methodA();
        }
    }
    
    public class ThreadB extends Thread {
    
        private MyObject object;
    //省略构造方法
        @Override
        public void run() {
            super.run();
            object.methodB();
        }
    }
    
    public class Run {
        public static void main(String[] args) {
            MyObject object = new MyObject();
    
            //线程A与线程B 持有的是同一个对象:object
            ThreadA a = new ThreadA(object);
            ThreadB b = new ThreadB(object);
            a.start();
            b.start();
        }
    }
    
    

    由于线程A和线程B持有同一个MyObject类的对象object,尽管这两个线程需要调用不同的方法,但是它们是同步执行的,比如:线程B需要等待线程A执行完了methodA()方法之后,它才能执行methodB()方法。这样,线程A和线程B就实现了 通信。

    这种方式,本质上就是“共享内存”式的通信。多个线程需要访问同一个共享变量,谁拿到了锁(获得了访问权限),谁就可以执行。

    ②while轮询的方式

    代码如下:

     1 import java.util.ArrayList;
     2 import java.util.List;
     3 
     4 public class MyList {
     5 
     6     private List<String> list = new ArrayList<String>();
     7     public void add() {
     8         list.add("elements");
     9     }
    10     public int size() {
    11         return list.size();
    12     }
    13 }
    14 
    15 import mylist.MyList;
    16 
    17 public class ThreadA extends Thread {
    18 
    19     private MyList list;
    20 
    21     public ThreadA(MyList list) {
    22         super();
    23         this.list = list;
    24     }
    25 
    26     @Override
    27     public void run() {
    28         try {
    29             for (int i = 0; i < 10; i++) {
    30                 list.add();
    31                 System.out.println("添加了" + (i + 1) + "个元素");
    32                 Thread.sleep(1000);
    33             }
    34         } catch (InterruptedException e) {
    35             e.printStackTrace();
    36         }
    37     }//需要获取资料的朋友请加Q君样:290194256*
    38 }
    39 
    40 import mylist.MyList;
    41 
    42 public class ThreadB extends Thread {
    43 
    44     private MyList list;
    45 
    46     public ThreadB(MyList list) {
    47         super();
    48         this.list = list;
    49     }
    50 
    51     @Override
    52     public void run() {
    53         try {
    54             while (true) {
    55                 if (list.size() == 5) {
    56                     System.out.println("==5, 线程b准备退出了");
    57                     throw new InterruptedException();
    58                 }
    59             }
    60         } catch (InterruptedException e) {
    61             e.printStackTrace();
    62         }
    63     }
    64 }
    65 
    66 import mylist.MyList;
    67 import extthread.ThreadA;
    68 import extthread.ThreadB;
    69 
    70 public class Test {
    71 
    72     public static void main(String[] args) {
    73         MyList service = new MyList();
    74 
    75         ThreadA a = new ThreadA(service);
    76         a.setName("A");
    77         a.start();
    78 
    79         ThreadB b = new ThreadB(service);
    80         b.setName("B");
    81         b.start();
    82     }
    83 }
    

    在这种方式下,线程A不断地改变条件,线程ThreadB不停地通过while语句检测这个条件(list.size()==5)是否成立 ,从而实现了线程间的通信。但是这种方式会浪费CPU资源。之所以说它浪费资源,是因为JVM调度器将CPU交给线程B执行时,它没做啥“有用”的工作,只是在不断地测试 某个条件是否成立。就类似于现实生活中,某个人一直看着手机屏幕是否有电话来了,而不是: 在干别的事情,当有电话来时,响铃通知TA电话来了。关于线程的轮询的影响,可参考:JAVA多线程之当一个线程在执行死循环时会影响另外一个线程吗?

    线程都是先把变量读取到本地线程栈空间,然后再去再去修改的本地变量。因此,如果线程B每次都在取本地的 条件变量,那么尽管另外一个线程已经改变了轮询的条件,它也察觉不到,这样也会造成死循环。

    ③wait/notify机制

    代码如下:

     1 import java.util.ArrayList;
     2 import java.util.List;
     3 
     4 public class MyList {
     5 
     6     private static List<String> list = new ArrayList<String>();
     7 
     8     public static void add() {
     9         list.add("anyString");
    10     }//需要获取资料的朋友请加Q君样:290194256*
    11 
    12     public static int size() {
    13         return list.size();
    14     }
    15 }
    16 
    17 
    18 public class ThreadA extends Thread {
    19 
    20     private Object lock;
    21 
    22     public ThreadA(Object lock) {
    23         super();
    24         this.lock = lock;
    25     }
    26 
    27     @Override
    28     public void run() {
    29         try {
    30             synchronized (lock) {
    31                 if (MyList.size() != 5) {
    32                     System.out.println("wait begin "
    33                             + System.currentTimeMillis());
    34                     lock.wait();
    35                     System.out.println("wait end  "
    36                             + System.currentTimeMillis());
    37                 }//需要获取资料的朋友请加Q君样:290194256*
    38             }
    39         } catch (InterruptedException e) {
    40             e.printStackTrace();
    41         }
    42     }
    43 }
    44 
    45 
    46 public class ThreadB extends Thread {
    47     private Object lock;
    48 
    49     public ThreadB(Object lock) {
    50         super();
    51         this.lock = lock;
    52     }
    53 
    54     @Override
    55     public void run() {
    56         try {
    57             synchronized (lock) {
    58                 for (int i = 0; i < 10; i++) {
    59                     MyList.add();
    60                     if (MyList.size() == 5) {
    61                         lock.notify();
    62                         System.out.println("已经发出了通知");
    63                     }
    64                     System.out.println("添加了" + (i + 1) + "个元素!");
    65                     Thread.sleep(1000);
    66                 }
    67             }
    68         } catch (InterruptedException e) {
    69             e.printStackTrace();
    70         }
    71     }
    72 }
    73 
    74 public class Run {
    75 
    76     public static void main(String[] args) {
    77 
    78         try {
    79             Object lock = new Object();
    80 
    81             ThreadA a = new ThreadA(lock);
    82             a.start();
    83 
    84             Thread.sleep(50);
    85 
    86             ThreadB b = new ThreadB(lock);
    87             b.start();
    88         } catch (InterruptedException e) {
    89             e.printStackTrace();
    90         }//需要获取资料的朋友请加Q君样:290194256*
    91     }
    92 }
    

    线程A要等待某个条件满足时(list.size()==5),才执行操作。线程B则向list中添加元素,改变list 的size。

    A,B之间如何通信的呢?也就是说,线程A如何知道 list.size() 已经为5了呢?

    这里用到了Object类的 wait() 和 notify() 方法。

    当条件未满足时(list.size() !=5),线程A调用wait() 放弃CPU,并进入阻塞状态。—不像while轮询那样占用CPU
    当条件满足时,线程B调用 notify()通知 线程A,所谓通知线程A,就是唤醒线程A,并让它进入可运行状态。

    这种方式的一个好处就是CPU的利用率提高了。

    但是也有一些缺点:比如,线程B先执行,一下子添加了5个元素并调用了notify()发送了通知,而此时线程A还执行;当线程A执行并调用wait()时,那它永远就不可能被唤醒了。因为,线程B已经发了通知了,以后不再发通知了。这说明:通知过早,会打乱程序的执行逻辑。

    ④管道通信就是使用java.io.PipedInputStream 和 java.io.PipedOutputStream进行通信

    具体就不介绍了。分布式系统中说的两种通信机制:共享内存机制和消息通信机制。感觉前面的synchronized关键字和while轮询 “属于” 共享内存机制,由于是轮询的条件使用了volatile关键字修饰时,这就表示它们通过判断这个“共享的条件变量“是否改变了,来实现进程间的交流。

    而管道通信,更像消息传递机制,也就是说:通过管道,将一个线程中的消息发送给另一个。

    在这里插入图片描述
    在这里插入图片描述
    最新2020整理收集的一些高频面试题(都整理成文档),
    有很多干货,包含mysql,netty,spring,线程,spring cloud、jvm、
    源码、算法等详细讲解,也有详细的学习规划图,面试题整理等,
    需要获取这些内容的朋友请加Q君样:290194256*

    展开全文
  • Java多线程-通讯方式

    2021-09-11 14:08:02
    Java多线程-通讯方式 线程之间为什么要通信? 通信的目的是为了更好的协作,线程无论是交替式执行,还是接力式执行,都需要进行通信告知。那么java线程是如何通信的呢,大致有以下四种方式。 Java线程的通信方式 ...

    Java多线程-通讯方式

    线程之间为什么要通信?

    通信的目的是为了更好的协作,线程无论是交替式执行,还是接力式执行,都需要进行通信告知。那么java线程是如何通信的呢,大致有以下四种方式。

    Java线程的通信方式

    首先,要线程间通信的模型有两种:共享内存和消息传递

    方式一:使用 volatile 关键字

    基于 volatile 关键字来实现线程间相互通信是使用共享内存的思想,大致意思就是多个线程同时监听一个变量,当这个变量发生变化的时候 ,线程能够感知并执行相应的业务。这也是最简单的一种实现方式。代码如下所示:

    package duo;
    
    import java.util.ArrayList;
    import java.util.List;
    
    public class Mythread {
    
            // 定义一个共享变量来实现通信,它需要是volatile修饰,否则线程不能及时感知
            static volatile boolean notice = false;
    
            public static void main(String[] args) {
                List<String> list = new ArrayList<>();
    
                // 实现线程A
                Thread threadA = new Thread(() -> {
                    for (int i = 1; i <= 10; i++) {
                        list.add("abc");
                        System.out.println("线程A向list中添加一个元素,此时list中的元素个数为:" + list.size());
                        try {
                            Thread.sleep(500);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        if (list.size() == 5)
                            notice = true;
                    }
                });
    
                // 实现线程B
                Thread threadB = new Thread(() -> {
                    while (true) {
                        if (notice) {
                            System.out.println("线程B收到通知,开始执行自己的业务...");
                            break;
                        }
                    }
                });
    
                // 需要先启动线程B, B相当于是监听的动作
                threadB.start();
    
    
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 再启动线程A
                threadA.start();
            }
    
    
    }
    
    
    

    线程A向list中添加一个元素,此时list中的元素个数为:1
    线程A向list中添加一个元素,此时list中的元素个数为:2
    线程A向list中添加一个元素,此时list中的元素个数为:3
    线程A向list中添加一个元素,此时list中的元素个数为:4
    线程A向list中添加一个元素,此时list中的元素个数为:5
    线程A向list中添加一个元素,此时list中的元素个数为:6
    线程B收到通知,开始执行自己的业务…
    线程A向list中添加一个元素,此时list中的元素个数为:7
    线程A向list中添加一个元素,此时list中的元素个数为:8
    线程A向list中添加一个元素,此时list中的元素个数为:9
    线程A向list中添加一个元素,此时list中的元素个数为:10

    方式二:使用Object类的wait() 和 notify() 方法

    1、wait()、notify()/notifyAll() 方法是Object的本地final方法,无法被重写。

    2、wait()使当前线程阻塞,前提是 必须先获得锁,一般配合synchronized 关键字使用,即,一般在synchronized 同步代码块里使用 wait()、notify/notifyAll() 方法。

    3、 由于 wait()、notify/notifyAll() 在synchronized 代码块执行,说明当前线程一定是获取了锁的。

    当线程执行wait()方法时候,会释放当前的锁,然后让出CPU,进入等待状态。

    只有当 notify/notifyAll() 被执行时候,才会唤醒一个或多个正处于等待状态的线程,然后继续往下执行,直到执行完synchronized 代码块的代码或是中途遇到wait() ,再次释放锁。

    也就是说,notify/notifyAll() 的执行只是唤醒沉睡的线程,而不会立即释放锁

    注意: 所以很多情况下你发现使用了notify进行唤醒了 但是没效果 还记得synchronized是干嘛的吗 是同步线程的作用在同一时间只允许一个线程执行 当这个线程执行完毕后才释放锁对象

    所以notify只是唤醒等待池中的线程进入准备状态 但是如果正在执行的线程一直不交出锁的权限那么 处于唤醒准备状态的线程一直不会执行

    所以在编程中,尽量在使用了notify/notifyAll() 后立即退出临界区,以唤醒其他线程让其获得锁

    问题演示:

    A 使用wait() B 使用notify()进行唤醒A 但是这前提条件是B执行完毕将锁对象交出去 其他线程才会执行 结果如下:

    A使用wait() 进入等待状态

    B

    B

    B满足条件使用notify()唤醒A

    B

    B 执行完毕交出锁对象

    A接收到锁对象 继续从上次位置执行

    可以通过结果发现 当唤醒线程不能立即执行

    解决办法就是在使用notify()的时候将当前线程锁对象交出去

    A使用wait() 进入等待状态

    B

    B

    B满足条件使用notify()唤醒A 然后B使用wait() 进入等待状态 将当前锁对象释放

    A接收到锁对象 继续从上次位置执行

    A

    A 执行完后 使用 notify() 唤醒B

    B

    B

    AB程序结束

    要注意,notify唤醒沉睡的线程后,线程会接着上次的执行继续往下执行。

      private  static volatile int num=0;
    
        private   static Object lock=new Object();
        public static void main(String[] args) {
    
          Thread a= new Thread(new Runnable() {
                @Override
                public void run() {
    
                    synchronized (lock) {
                        for (int i = 0; i < 10; i++) {
                            System.out.println("A线程");
                            num++;
                            if(num==5){
                                System.out.println("唤醒b线程");
                                lock.notify();
    
                                System.out.println("A线程释放锁对象  进入等待状态");
                                try {
                                    lock.wait();
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                            }
                        }
                    }
                    System.out.println("A 线程执行完毕  AB程序结束");
                }
            });
    
            Thread b= new Thread(new Runnable() {
                @Override
                public void run() {
    
                        while (Thread.interrupted()==false){   //  检测是否是中断状态 false 代表没有被中断
                            synchronized (lock) {
                                if(num==5){
    
                                    try {
                                        Thread.sleep(1000);
                                    } catch (InterruptedException e) {
                                        e.printStackTrace();
                                    }
                                    System.out.println("B");
    
                                    System.out.println("B执行完毕 设置中断");
    
                                    Thread.currentThread().interrupt(); //设置中断 true
                                }else{
    
                                    System.out.println("B线程释放锁对象  进入等待状态");
                                    try {
                                        lock.wait();
                                    } catch (InterruptedException e) {
                                        e.printStackTrace();
                                    }
                                    System.out.println("B线程获取到锁对象  开始执行");
                                }
                            }
                        }
    
                    System.out.println("B 线程执行完成  唤醒A线程");
                    synchronized (lock) {
                        lock.notify();
                    }
    
                }
            });
    
    
    // 先让b线程执行   进行监控
            b.start();
    
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            a.start();
        }
    
    

    B线程释放锁对象 进入等待状态
    A线程
    A线程
    A线程
    A线程
    A线程
    唤醒b线程
    A线程释放锁对象 进入等待状态
    B线程获取到锁对象 开始执行
    B
    B执行完毕 设置中断
    B 线程执行完成 唤醒A线程
    A线程
    A线程
    A线程
    A线程
    A线程
    A 线程执行完毕 AB程序解决

    实现生产者和消费者问题

    1、生产者产生资源往池子里添加,前提是池子没有满,如果池子满了,则生产者暂停生产,直到自己的生成能放下池子。

    2、消费者消耗池子里的资源,前提是池子的资源不为空,否则消费者暂停消耗,进入等待直到池子里有资源数满足自己的需求。

    接口

    public interface AbstractStorage {
        void consume(int num); //消费者
        void produce(int num);  //生产者
    }
    

    Producer (生产者)

    public class Producer extends Thread{
        //每次生产的数量
        private int num ;
    
        //所属的仓库
        public AbstractStorage abstractStorage;
    
        public Producer(AbstractStorage abstractStorage){
            this.abstractStorage = abstractStorage;
        }
    
        public void setNum(int num){
            this.num = num;
        }
    
        // 线程run函数
        @Override
        public void run()
        {
            produce(num);
        }
    
        // 调用仓库Storage的生产函数
        public void produce(int num)
        {
            abstractStorage.produce(num);
        }
    }
    

    Consumer(消费者)

    public class Consumer extends Thread{
        // 每次消费的产品数量
        private int num;
    
        // 所在放置的仓库
        private AbstractStorage abstractStorage1;
    
        // 构造函数,设置仓库
        public Consumer(AbstractStorage abstractStorage1)
        {
            this.abstractStorage1 = abstractStorage1;
        }
    
        // 线程run函数
        public void run()
        {
            consume(num);
        }
    
        // 调用仓库Storage的生产函数
        public void consume(int num)
        {
            abstractStorage1.consume(num);
        }
    
        public void setNum(int num){
            this.num = num;
        }
    }
    
    

    Storage(仓库)

    
    import java.util.LinkedList;
    
    public class Storage implements AbstractStorage {
        //仓库最大容量
        private final int MAX_SIZE = 100;
        //仓库存储的载体
        private LinkedList list = new LinkedList();
    
        //生产产品
        public void produce(int num){
            //同步
            synchronized (list){
                //仓库剩余的容量不足以存放即将要生产的数量,暂停生产
                while(list.size()+num > MAX_SIZE){
                    System.out.println("【要生产的产品数量】:" + num + "\t【库存量】:"
                            + list.size() + "\t暂时不能执行生产任务!");
    
                    try {
                        //条件不满足,生产阻塞
                        list.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
    
                for(int i=0;i<num;i++){
                    list.add(new Object());
                }
    
                System.out.println("【已经生产产品数】:" + num + "\t【现仓储量为】:" + list.size());
    
                list.notifyAll();
            }
        }
    
        //消费产品
        public void consume(int num){
            synchronized (list){
    
                //不满足消费条件
                while(num > list.size()){
                    System.out.println("【要消费的产品数量】:" + num + "\t【库存量】:"
                            + list.size() + "\t暂时不能执行生产任务!");
    
                    try {
                        list.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
    
                //消费条件满足,开始消费
                for(int i=0;i<num;i++){
                    list.remove();
                }
    
                System.out.println("【已经消费产品数】:" + num + "\t【现仓储量为】:" + list.size());
    
                list.notifyAll();
            }
        }
    }
    
    

    Test

    public class Test{
        public static void main(String[] args) {
            // 仓库对象
            AbstractStorage abstractStorage = new Storage();
    
         // 创建7生产者对象
            Producer p1 = new Producer(abstractStorage);
            Producer p2 = new Producer(abstractStorage);
            Producer p3 = new Producer(abstractStorage);
            Producer p4 = new Producer(abstractStorage);
            Producer p5 = new Producer(abstractStorage);
            Producer p6 = new Producer(abstractStorage);
            Producer p7 = new Producer(abstractStorage);
    
            // 消费者对象
            Consumer c1 = new Consumer(abstractStorage);
            Consumer c2 = new Consumer(abstractStorage);
            Consumer c3 = new Consumer(abstractStorage);
    
            // 设置7个生产者产品生产数量
            p1.setNum(10);
            p2.setNum(10);
            p3.setNum(10);
            p4.setNum(10);
            p5.setNum(10);
            p6.setNum(10);
            p7.setNum(80);
    
            // 设置消费者产品消费数量
            c1.setNum(50);
            c2.setNum(20);
            c3.setNum(30);
    
            // 消费者线程开始执行  进行监听
            c1.start();
            c2.start();
            c3.start();
    
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        //生产者 线程开始
            p1.start();
            p2.start();
            p3.start();
            p4.start();
            p5.start();
            p6.start();
            p7.start();
        }
    }
    
    

    【要消费的产品数量】:50 【库存量】:0 暂时不能执行生产任务!
    【要消费的产品数量】:30 【库存量】:0 暂时不能执行生产任务!
    【要消费的产品数量】:20 【库存量】:0 暂时不能执行生产任务!
    【已经生产产品数】:10 【现仓储量为】:10
    【已经生产产品数】:10 【现仓储量为】:20
    【已经生产产品数】:80 【现仓储量为】:100
    【要生产的产品数量】:10 【库存量】:100 暂时不能执行生产任务!
    【要生产的产品数量】:10 【库存量】:100 暂时不能执行生产任务!
    【已经消费产品数】:20 【现仓储量为】:80
    【已经消费产品数】:30 【现仓储量为】:50
    【已经消费产品数】:50 【现仓储量为】:0
    【已经生产产品数】:10 【现仓储量为】:10
    【已经生产产品数】:10 【现仓储量为】:20
    【已经生产产品数】:10 【现仓储量为】:30
    【已经生产产品数】:10 【现仓储量为】:40

    方式3:使用ReentrantLock结合Condition

    和使用Object的wait()、notify 效果差不多

    只是前者需要在 synchronized 代码块执行 而后者需要在lock加锁后才能使用

    此方法在博客 java多线程-Lock锁 这篇文章里有教程 这里就不说了

    方式四:使用JUC工具类

    jdk1.5之后在java.util.concurrent包下提供了很多并发编程相关的工具类,简化了我们的并发编程代码的书写,

    • CountDownLatch:用于某个线程A等待若干个其他线程执行完之后,它才执行
    • CyclicBarrier:一组线程等待至某个状态之后再全部同时执行
    • Semaphore:用于控制对某组资源的访问权限

    Object和Condition休眠唤醒区别

    • object wait()必须在synchronized(同步锁)下使用,

    • object wait()必须要通过Nodify()方法进行唤醒

    • condition await() 必须和Lock(互斥锁/共享锁)配合使用

    • condition await() 必须通过 signal() 方法进行唤醒

    CountDownLatch

    CountDownLatch概念

    CountDownLatch是在java1.5被引入的,存在于java.util.concurrent包下。

    CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行。

    CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。

    每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。

    CountDownLatch的用法

    CountDownLatch典型用法:

    1、某一线程在开始运行前等待n个线程执行完毕。将CountDownLatch的计数器初始化为new CountDownLatch(n),每当一个任务线程执行完毕,就将计数器减1 countdownLatch.countDown(),当计数器的值变为0时,在CountDownLatch上await()的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。

    2、实现多个线程的并行,而不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的CountDownLatch(1),将其计算器初始化为1,多个线程在开始执行任务前首先countdownlatch.await(),当主线程调用countDown()时,计数器变为0,多个线程同时被唤醒。

    CountDownLatch的不足

    CountDownLatch是一次性的,计算器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当CountDownLatch使用完毕后,它不能再次被使用。

    CountDownLatch方法使用说明

    //递减锁存器的计数,如果计数到达零,则释放所有等待的线程。如果当前计数大于零,则将计数减少.
    public void countDown()
    
    //调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
    public void await() throws InterruptedException { };  
    
    /*
       timeout 要等待的最长时间
      unit    参数的时间单位
      和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
    */
    
    public boolean await(long timeout,TimeUnit unit) throws InterruptedException
    

    案例

    例子1: 主线程等待子线程执行完成在执行

    public static void main(String[] args) {
            ExecutorService service = Executors.newFixedThreadPool(3);
            final CountDownLatch latch = new CountDownLatch(3);
            for (int i = 0; i < 3; i++) {
                Runnable runnable = new Runnable() {
                    @Override
                    public void run() {
                        try {
                            System.out.println("子线程" + Thread.currentThread().getName() + "开始执行");
                            Thread.sleep((long) (Math.random() * 10000));
                            System.out.println("子线程"+Thread.currentThread().getName()+"执行完成");
                            latch.countDown();//当前线程调用此方法,则计数减一
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                };
                service.execute(runnable);
            }
    
            try {
                System.out.println("主线程"+Thread.currentThread().getName()+"等待子线程执行完成...");
                latch.await();//阻塞当前主线程,直到CountDownLatch里计数器的值为0开始释放
                System.out.println("主线程"+Thread.currentThread().getName()+"开始执行...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    

    例子2:

    百米赛跑,4名运动员选手到达场地等待裁判口令,裁判一声口令,选手听到后同时起跑,当所有选手到达终点,裁判进行汇总排名

    public static void main(String[] args) {
            ExecutorService service = Executors.newCachedThreadPool();
            final CountDownLatch cdOrder = new CountDownLatch(1); //裁判
            final CountDownLatch cdAnswer = new CountDownLatch(4); //选手
            for (int i = 0; i < 4; i++) {
                Runnable runnable = new Runnable() {
                    @Override
                    public void run() {
                        try {
                            System.out.println("选手" + Thread.currentThread().getName() + "正在等待裁判发布口令");
                            cdOrder.await();
                            System.out.println("选手" + Thread.currentThread().getName() + "已接受裁判口令");
                            Thread.sleep((long) (Math.random() * 10000));
                            System.out.println("选手" + Thread.currentThread().getName() + "到达终点");
                            cdAnswer.countDown();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                };
                service.execute(runnable);
            }
            try {
                Thread.sleep((long) (Math.random() * 10000));
                System.out.println("裁判"+Thread.currentThread().getName()+"即将发布口令");
                cdOrder.countDown();  //裁判发送口令4个选手线程 并行运行 注意不是并发
                System.out.println("裁判"+Thread.currentThread().getName()+"已发送口令,正在等待所有选手到达终点");
                cdAnswer.await();  //阻塞主线程 等待所有子线程运行完
                System.out.println("所有选手都到达终点");
                System.out.println("裁判"+Thread.currentThread().getName()+"汇总成绩排名");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            service.shutdown();//结束线程池
        }
    

    CyclicBarrier

    现实生活中我们经常会遇到这样的情景,在进行某个活动前需要等待人全部都齐了才开始。例如吃饭时要等全家人都上座了才动筷子,旅游时要等全部人都到齐了才出发,比赛时要等运动员都上场后才开始。

    在JUC包中为我们提供了一个同步工具类能够很好的模拟这类场景,它就是CyclicBarrier类。利用CyclicBarrier类可以实现一组线程相互等待,当所有线程都到达某个屏障点后再进行后续的操作。下图演示了这一过程。

    在举个例子,就像生活中我们会约朋友们到某个餐厅一起吃饭,有些朋友可能会早到,有些朋友可能会晚到,但是这个餐厅规定必须等到所有人到齐之后才会让我们进去。这里的朋友们就是各个线程,餐厅就是 CyclicBarrier。

    接下来我们看看它的构造器

    //构造器1
    public CyclicBarrier(int parties, Runnable barrierAction) {
      if (parties <= 0) throw new IllegalArgumentException();
      this.parties = parties;
      this.count = parties;
      this.barrierCommand = barrierAction;
    }
     
    //构造器2
    public CyclicBarrier(int parties) {
      this(parties, null);
    }
    
    

    CyclicBarrier有两个构造器,其中构造器1是它的核心构造器

    参数1: 在这里你可以指定本局游戏的参与者数量(要拦截的线程数)

    参数2: 本局结束时要执行的任务,(也就是所有线程执行完后执行的线程)

    CyclicBarrier供了两种等待的方法,分别是定时等待和非定时等待。

    //非定时等待 (比如指定3个线程 每一个线程调用一次内部count--  当count==0时 释放3个线程 然后count重置为3  )
    public int await() throws InterruptedException, BrokenBarrierException {
      try {
        return dowait(false, 0L);
      } catch (TimeoutException toe) {
        throw new Error(toe);
      }
    }
     
    //定时等待 (就是子一定时间内如果还没有 到时间自动唤醒)
    public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
      return dowait(true, unit.toNanos(timeout));
    }
    

    参数1 : timeout 时间

    参数2: 时间单位 TimeUnit.SECONDS (秒)

    案例1:

     public static void main(String[] args) {
            // 设置线程个数为2  当个数=0时释放所有等待的线程   然后恢复线程的个数为2
            CyclicBarrier cb = new CyclicBarrier(2);
            new Thread(()->{
                System.out.println("线程1开始.."+new Date());
                try {
                    cb.await(); // 2-1=1当个数不足时,等待
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("线程1继续向下运行..."+new Date());
            }).start();
    
    
            new Thread(()->{
                System.out.println("线程2开始.."+new Date());
                try { Thread.sleep(2000); } catch (InterruptedException e) { }
    
                try {
                    cb.await(); // 1-1=0 线程个数够2,释放所有等待的线程
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("线程2继续向下运行..."+new Date());
            }).start();
    
        }
    

    看懂这个后那么下面的这个案例你就能看懂了

    案例2:

    赛马

    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Random;
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class MyLock{
    
        //马类
        static class Horse implements Runnable {
    
            private static int counter = 0;
            private final int id = counter++;  //每次创建一个新对象的时候都会在原来id基础上+1
    
    
            private int strides = 0;   //赛马每次随机跑几步 最多0~3步
            private static Random rand = new Random();//随机数
            private static CyclicBarrier barrier;//获取
    
            public Horse(CyclicBarrier b) { barrier = b; }
    
            @Override
            public void run() {
                try {
                    while(!Thread.interrupted()) { //!中断标识(false)
                        synchronized(this) {
                            //赛马每次随机跑几步
                            strides += rand.nextInt(3);
                        }
                        barrier.await();//线程进入等待状态
                    }
                } catch(Exception e) {
                    e.printStackTrace();
                }
            }
    
            //使用*来模拟马跑的痕迹
            public String tracks() {
                StringBuilder s = new StringBuilder();
                for(int i = 0; i < getStrides(); i++) {
                    s.append("*");
                }
                s.append(id);
                return s.toString();
            }
    
            public synchronized int getStrides() { return strides; }
    
            public String toString() { return "Horse " + id + " "; }
    
        }
    
        //赛马
        public static class HorseRace implements Runnable {
    
            private static final int FINISH_LINE = 75;//赛道长度
            private static List<Horse> horses = new ArrayList<Horse>();//存储马的线程 (赛道)
            private static ExecutorService exec = Executors.newCachedThreadPool();//线程池
    
            @Override
            public void run() {
                StringBuilder s = new StringBuilder();
                //打印赛道边界
                for(int i = 0; i < FINISH_LINE; i++) {
                    s.append("=");
                }
                System.out.println(s);
                //打印赛马轨迹
                for(Horse horse : horses) {
                    System.out.println(horse.tracks());
    
                }
    
                //判断是否结束(只要有一匹马跑到终点那么将结束比赛 其他马将不用在跑了)
                for(Horse horse : horses) {
                    if(horse.getStrides() >= FINISH_LINE) {
                        System.out.println(horse.toString() + "won!");
                        exec.shutdownNow();//调用线程池的结束全部线程方法不管是否还在运行 中断标识设置为true
                        return;//结束run方法
                    }
                }
                //控制台刷新比赛的频率
                try {
                    Thread.sleep(200);
                } catch(InterruptedException e) {
                    System.out.println("barrier-action sleep interrupted");
                }
            }
    
            public static void main(String[] args) {
                CyclicBarrier barrier = new CyclicBarrier(7, new HorseRace());
                for(int i = 0; i < 7; i++) {
                    Horse horse = new Horse(barrier);
                    horses.add(horse);//录入马到赛道里
                    exec.execute(horse);//将线程录入线程池里
                }
            }
    
        }
    
    
    
    
    }
    

    Semaphore

    Semaphore 是什么

    Semaphore 通常我们叫它信号量, 可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。

    可以把它简单的理解成我们停车场入口立着的那个显示屏,每有一辆车进入停车场显示屏就会显示剩余车位减1,每有一辆车从停车场出去,显示屏上显示的剩余车辆就会加1,当显示屏上的剩余车位为0时,停车场入口的栏杆就不会再打开,车辆就无法进入停车场了,直到有一辆车从停车场出去为止。

    使用场景

    用于那些资源有明确访问数量限制的场景,常用于限流 。

    比如:数据库连接池,同时进行连接的线程有数量限制,连接不能超过一定的数量,当连接达到了限制数量后,后面的线程只能排队等前面的线程释放了数据库连接才能获得数据库连接。

    比如:停车场场景,车位数量有限,同时只能容纳多少台车,车位满了之后只有等里面的车离开停车场外面的车才可以进入。

    Semaphore常用方法说明

    acquire()  
    获取一个令牌,在获取到令牌、或者被其他线程调用中断之前线程一直处于阻塞状态。 
    (简单来说就是没有令牌就处于阻塞状态)
        
    acquire(int permits)  
    获取指定几个令牌,在获取到令牌、或者被其他线程调用中断、或超时之前线程一直处于阻塞状态。
        
    acquireUninterruptibly() 
    获取一个令牌,在获取到令牌之前线程一直处于阻塞状态(忽略中断)。
        
    tryAcquire()
    尝试获得令牌,返回获取令牌成功或失败,不阻塞线程。
        
    tryAcquire(long timeout, TimeUnit unit)
    尝试获得令牌,在超时时间内循环尝试获取,直到尝试获取成功或超时返回,不阻塞线程。
    
    release()
    释放一个令牌,唤醒一个获取令牌不成功的阻塞线程。 (也就是 被acquire阻塞的线程 )
    
    hasQueuedThreads()
    等待队列里是否还存在等待线程。
    
    getQueueLength()
    获取等待队列里阻塞的线程数。
    
    drainPermits()
    清空令牌把可用令牌数置为0,返回清空令牌的数量。
    
    availablePermits()
    返回可用的令牌数量。
    

    在实际中无非就3个步骤 创建令牌(许可) 获取令牌(许可) 释放令牌(许可)

    Semaphore实现原理

    Semaphore初始化(创建令牌)

    Semaphore semaphore=new Semaphore(2);

    1、当调用new Semaphore(2) 方法时,默认会创建一个非公平的锁的同步阻塞队列。

    2、把初始令牌数量赋值给同步队列的state状态,state的值就代表当前所剩余的令牌数量。

    获取令牌

    semaphore.acquire();

    1、当前线程会尝试去同步队列获取一个令牌,获取令牌的过程也就是使用原子的操作去修改同步队列的state ,获取一个令牌则修改为state=state-1。

    2、 当计算出来的state<0,则代表令牌数量不足,此时会创建一个Node节点加入阻塞队列,挂起当前线程。

    3、当计算出来的state>=0,则代表获取令牌成功。可以正常运行

    释放令牌

    semaphore.release();

    当调用semaphore.release() 方法时

    1、线程会尝试释放一个令牌,释放令牌的过程也就是把同步队列的state修改为state=state+1的过程

    2、释放令牌成功之后,同时会唤醒同步队列的所有阻塞节 进行state=state-1 的操作,如果state>=0则获取令牌成功 继续执行

    3、而其他的节点也会重新尝试去修改state=state-1 的操作,如果state>=0则获取令牌成功,否则重新进入阻塞队列,挂起线程。

    用semaphore 实现停车场提示牌功能。

    每个停车场入口都有一个提示牌,上面显示着停车场的剩余车位还有多少,当剩余车位为0时,不允许车辆进入停车场,直到停车场里面有车离开停车场,这时提示牌上会显示新的剩余车位数。

    业务场景 :

    1、停车场容纳总停车量10。

    2、当一辆车进入停车场后,显示牌的剩余车位数响应的减1.

    3、每有一辆车驶出停车场后,显示牌的剩余车位数响应的加1。

    4、停车场剩余车位不足时,车辆只能在外面等待。

    代码:

    //停车场同时容纳的车辆10
        private  static  Semaphore semaphore=new Semaphore(10);//一次能停10辆
    
        public static void main(String[] args) {
    
            //模拟100辆车进入停车场
            for(int i=0;i<100;i++){
    
                Thread thread=new Thread(new Runnable() {
                    public void run() {
                        try {
                            System.out.println("===="+Thread.currentThread().getName()+"来到停车场");
                            if(semaphore.availablePermits()==0){
                                System.out.println("车位不足,请耐心等待");
                            }
                            semaphore.acquire();//获取令牌尝试进入停车场
                            System.out.println(Thread.currentThread().getName()+"成功进入停车场");
                            Thread.sleep(new Random().nextInt(1000));//模拟每辆车辆在停车场停留的时间
                            System.out.println(Thread.currentThread().getName()+"驶出停车场");
                            semaphore.release();//释放令牌,腾出停车场车位
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                },i+"号车");
    
                thread.start();
    
            }
    
        }
    }
    
    

    大概9~10秒所有车都停过一遍了

    方式五:LockSupport实现线程间的阻塞和唤醒

    LockSupport 是一种非常灵活的实现线程间阻塞和唤醒的工具,使用它不用关注是等待线程先进行还是唤醒线程先运行,但是得知道线程的名字。

    LockSupport是什么

    刚刚开头提到过,LockSupport是一个线程工具类,所有的方法都是静态方法,可以让线程在任意位置阻塞,也可以在任意位置唤醒。

    它的内部其实两类主要的方法:park(阻塞线程)和unpark(启动唤醒线程)。

    注意上面的123方法,都有一个blocker,这个blocker是用来记录线程被阻塞时被谁阻塞的。用于线程监控和分析工具来定位原因的。

    现在我们知道了LockSupport是用来阻塞和唤醒线程的,而且之前相信我们都知道wait/notify也是用来阻塞和唤醒线程的,那么它相比,LockSupport有什么优点呢?

    2、与wait/notify对比

    这里假设你已经了解了wait/notify的机制,如果不了解,上面有自己看 ,很简单。既然学到了这个LockSupport,相信你已经提前已经学了wait/notify。

    我们先来举一个使用案例:

    上面这段代码的意思是,我们定义一个线程,但是在内部进行了park,因此需要unpark才能唤醒继续执行,不过上面,我们在MyThread进行的park,在main线程进行的unpark。

    这样来看,好像和wait/notify没有什么区别。那他的区别到底是什么呢?这个就需要仔细的观察了。这里主要有两点:

    (1)wait和notify都是Object中的方法,在调用这两个方法前必须先获得锁对象,但是park不需要获取某个对象的锁就可以锁住线程。

    (2)notify只能随机选择一个线程唤醒,无法唤醒指定的线程,unpark却可以唤醒一个指定的线程。

    区别就是这俩

    LockSupport使用

      public static void main(String[] args) {
                List<String> list = new ArrayList<>();
                // 实现线程B
                Thread threadB = new Thread(() -> {
                    if (list.size() != 5) {
                        LockSupport.park();
                    }
                    System.out.println("线程B收到通知,开始执行自己的业务...");
                });
    
                // 实现线程A
                Thread threadA = new Thread(() -> {
                    for (int i = 1; i <= 10; i++) {
                        list.add("abc");
                        System.out.println("线程A向list中添加一个元素,此时list中的元素个数为:" + list.size());
                        try {
                            Thread.sleep(500);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        if (list.size() == 5)
                            LockSupport.unpark(threadB);
                    }
                });
    
                threadA.start();
                threadB.start();
            }
    
    的锁就可以锁住线程。
    
    (2)notify只能随机选择一个线程唤醒,无法唤醒指定的线程,unpark却可以唤醒一个指定的线程。
    
    区别就是这俩
    
    #### LockSupport使用
    
    ```java
      public static void main(String[] args) {
                List<String> list = new ArrayList<>();
                // 实现线程B
                Thread threadB = new Thread(() -> {
                    if (list.size() != 5) {
                        LockSupport.park();
                    }
                    System.out.println("线程B收到通知,开始执行自己的业务...");
                });
    
                // 实现线程A
                Thread threadA = new Thread(() -> {
                    for (int i = 1; i <= 10; i++) {
                        list.add("abc");
                        System.out.println("线程A向list中添加一个元素,此时list中的元素个数为:" + list.size());
                        try {
                            Thread.sleep(500);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        if (list.size() == 5)
                            LockSupport.unpark(threadB);
                    }
                });
    
                threadA.start();
                threadB.start();
            }
    
    
    点赞 -收藏-关注-便于以后复习和收到最新内容
    有其他问题在评论区讨论-或者私信我-收到会在第一时间回复
    如有侵权,请私信联系我
    感谢,配合,希望我的努力对你有帮助^_^

    展开全文
  • 这篇博客是是我在学习了多线程并发和操作系统后,针对Java中的情况,为保证线程安全和线程的并发运行进行总结的博客。 看了我的博客后如果哪里我有理解不到位地方欢迎大家评论区给我留言,感谢支持。 volatile、...
  • Java多线程-线程通信

    2021-03-09 09:15:43
    通信方式要想实现线程之间的协同,如:线程执行先后顺序、获取某个线程执行的结果等等。涉及到线程之间的相互通信,分为下面四类:文件共享网络共享共享变量JDK提供的线程协调APIsuspend/resume、wait/notify...
  • Java多线程

    2021-02-28 14:03:03
    Java多线程,线程池有哪几类,每一类的差别 ThreadLocal原理,线程池工作流程 synchronized 实现原理 ReentrantLock源码,AQS,synchronized实现 Thread中,ThreadLocal,Lock等 一道线程通信问题,给你三个线程,...
  • while轮询 其实就是多线程同时执行,会牺牲部分CPU性能。 在这种方式下,线程A不断地改变条件,线程ThreadB不停地通过while语句检测这个条件(list.size()==5)是否成立 ,从而实现了线程间的通信。但是这种方式会...
  • java多线程-线程通信

    2021-02-28 14:00:34
    通过共享对象通信忙等待wait(),notify()和 notifyAll()丢失的信号假唤醒多线程等待相同信号不要对常量字符串或全局对象调用 wait()通过共享对象通信线程间发送信号的一个简单方式是在共享对象的变量里设置信号值。...
  • 并发操作系统会根据任务调度系统给线程分配线程的 CPU 执行时间,线程的执行会进行切换。 线程和进程的区别? 1、进程是资源分配的最小单位,线程是程序执行的最小单位(资源调度的最小单位)一个程序至少有一个...
  • 进程概述:在这之前,有必要了解一下什么是进程?在一个操作系统中,每个独立的执行的程序都可称为一个进程,也就是“正在运行的...多线程的概念:多线程是指一个应用程序中有许多条并发执行的线索,每条线索都被...
  • 1.1 什么是多线程1.1.1 计算机硬件计算机的核心硬件有磁盘、内存、CPU,磁盘用来持久化保存数据,CPU用于计算,内存是磁盘和CPU之间的一个缓冲区。说明:1. 磁盘读写太慢,CPU运算太快,如果CPU每次都到磁盘读写数据...
  • Java多线程基础

    2021-02-12 15:29:11
    一、程序,进程,线程等相关概念的理解程序是指未完成指定任务,用某种语言编写的一段代码指令...(线程作为调度和执行的单位,线程之间的切换开销小)每个线程都拥有自己独立的栈和程序计数器线程会共享同一个进...
  • java多线程

    2021-03-10 10:00:17
    第十七天知识点总结一、多线程进程:就是正在运行的程序,分配内存让应用程序能够运行。Windows 号称多任务(可以同时运行多个应用程序)。宏观上看:windows确实是运行了多个程序。微观上看:CPU快速切换执行任务,...
  • java多线程通信

    2021-03-01 06:25:56
    致我亲爱的知乎读者朋友:java多线程系列的笔记已经更新了4篇,之前的文章使用了代码加文字的形式,显得枯燥,容易让人反感,从这一次开始我将不再贴过多代码,主要是文字的描述为主,希望大家有什么建议可以留言给...
  • java多线程编程体会

    2021-03-16 17:34:33
    Java 程序中使用多线程要比在 C 或 C++ 中容易得多,这是因为 Java 编程语言提供了语言级的支持。本文通过简单的编程示例来说明 Java 程序中的多线程是多么直观。读完本文以后,用户应该能够编写简单的多线程程序...
  • 1.1 基本概念以及线程与进程之间的区别联系关于进程和线程,首先从定义上理解就有所不同:进程是具有一定独立功能的程序、它是系统进行资源分配和调度的一个独立单位,重点在系统调度和单独的单位,也就是说进程是...
  • Java多线程整理

    2021-03-01 06:23:38
    一、线程池过于频繁的创建/销毁线程浪费性能,线程并发数量过多,JVM调度是抢占式的,...线程池创建4种方式:newCachedThreadPool 缓存线程池,没有线程可用就创建,空闲线程60秒未使用将被回收。newFixedThreadPoo...
  • Java 多线程(超详细)

    千次阅读 2021-01-12 21:14:38
    多线程学习思路:为什么学习线程?为了解决CPU利用率问题,提高CPU利用率。 =》 什么是进程?什么是线程? =》 怎么创建线程?有哪几种方式?有什么特点? =》 分别怎么启动线程? =》 多线程带来了数据安全问题,该...
  • 一,介绍本总结我对于JAVA多线程中线程之间的通信方式的理解,主要以代码结合文字的方式来讨论线程间的通信,故摘抄了书中的一些示例代码。二,线程间的通信方式①同步这里讲的同步是指多个线程通过synchronized...
  • 在这篇文章中我们分析一下java多线程通信过程中出现的一个假死现象。然后给出一个解决办法。一、假死现象重现为了更好地演示我们的实例,我们使用生产者消费者模式,一边生产一边消费。打开UC浏览器 查看更多精彩图....
  • Java多线程(一文看懂!)

    2021-07-28 17:36:15
    二,多线程的实现方式 三,多线程的五大状态 四,多线程的调度 五,线程的同步(多口售票问题) 六,线程的协作(生产者-消费者模型) 一,多线程的介绍 百度中多线程的介绍(multithreading):是指从软件或者...
  • JAVA多线程.ppt

    2021-02-27 15:09:03
    106159278 Java多线程 目标 多线程的概念 如何创建线程 死锁的概念 线程同步 使用 wait() 和 notify() 在线程之间进行通信 线程生命控制 多线程的概念 多线程:程序中多个片断同时执行。 为了更好的了解线程,用...
  • java多线程其实在工作中接触的并不是很多,偶尔用一下,但是这个特性又是开发工程走向大牛必须要掌握的知识点,所以花几天时间整理了一下,一方便梳理知识点,另一方面也是为了以后更好地使用。 一. 线程和进程 ...
  • 一文搞懂Java多线程 首先理解线程和进程的概念! 进程 我们都知道计算机的核心是CPU,它承担了所有的计算任务,而操作系统是计算机的管理者,它负责任务的调度,资源的分配和管理,统领整个计算机硬件;应用程序...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 220,104
精华内容 88,041
关键字:

java多线程通讯方式

java 订阅