精华内容
下载资源
问答
  • } } 利用AtomicInteger 和volatile类似, 只是原子操作达到预估值非A即B 二、PipedInputStream、PipedOutputStream 这里用流在两个线程间通信,但是Java中的Stream是单向的,所以在两个线程中分别建了一个input和...

    文章目录

    一、使用同一个共享变量控制

    Synchronized、wait、notify

    public class Demo1 {

    private final List list =new ArrayList<>();

    public static void main(String[] args) {

    Demo1 demo =new Demo1();

    new Thread(()->{

    for (int i=0;i<10;i++){

    synchronized (demo.list){

    if(demo.list.size()%2==1){

    try {

    demo.list.wait();

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    }

    demo.list.add(i);

    System.out.print(Thread.currentThread().getName());

    System.out.println(demo.list);

    demo.list.notify();

    }

    }

    }).start();

    new Thread(()->{

    for (int i=0;i<10;i++){

    synchronized (demo.list){

    if(demo.list.size()%2==0){

    try {

    demo.list.wait();

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    }

    demo.list.add(i);

    System.out.print(Thread.currentThread().getName());

    System.out.println(demo.list);

    demo.list.notify();

    }

    }

    }).start();

    }

    }

    Lock、Condition

    public class Task {

    private final Lock lock = new ReentrantLock();

    private final Condition addConditon = lock.newCondition();

    private final Condition subConditon = lock.newCondition();

    private volatile int num = 0;

    private List list = new ArrayList<>();

    public void add() {

    for (int i = 0; i < 10; i++) {

    lock.lock();

    try {

    if (list.size() == 10) {

    addConditon.await();

    }

    num++;

    Thread.sleep(100);

    list.add("add " + num);

    System.out.println("The list size is " + list.size());

    System.out.println("The add thread is " + Thread.currentThread().getName());

    System.out.println("-------------");

    subConditon.signal();

    } catch (Exception e) {

    e.printStackTrace();

    } finally {

    lock.unlock();

    }

    }

    }

    public void sub() {

    for (int i = 0; i < 10; i++) {

    lock.lock();

    try {

    if (list.size() == 0) {

    subConditon.await();

    }

    num--;

    Thread.sleep(100);

    list.remove(0);

    System.out.println("The list size is " + list.size());

    System.out.println("The sub thread is " + Thread.currentThread().getName());

    System.out.println("-------------");

    addConditon.signal();

    } catch (Exception e) {

    e.printStackTrace();

    } finally {

    lock.unlock();

    }

    }

    }

    public static void main(String[] args) {

    Task task = new Task();

    new Thread(task::add).start();

    new Thread(task::sub).start();

    }

    }

    利用volatile

    volatile修饰的变量值直接存在主内存里面,子线程对该变量的读写直接写住内存,而不是像其它变量一样在local thread里面产生一份copy。volatile能保证所修饰的变量对于多个线程可见性,即只要被修改,其它线程读到的一定是最新的值。

    public class Demo2 {

    private volatile List list =new ArrayList<>();

    public static void main(String[] args) {

    Demo2 demo =new Demo2();

    new Thread(()->{

    for (int i=0;i<10;i++){

    demo.list.add(i);

    System.out.print(Thread.currentThread().getName());

    System.out.println(demo.list);

    }

    }).start();

    new Thread(()->{

    for (int i=0;i<10;i++){

    demo.list.add(i);

    System.out.print(Thread.currentThread().getName());

    System.out.println(demo.list);

    }

    }).start();

    }

    }

    利用AtomicInteger

    和volatile类似, 只是原子操作达到预估值非A即B

    09f47fa7bf2392f9d5b73e6cfcc9f2ba.png

    二、PipedInputStream、PipedOutputStream

    这里用流在两个线程间通信,但是Java中的Stream是单向的,所以在两个线程中分别建了一个input和output

    public class PipedDemo {

    private final PipedInputStream inputStream1;

    private final PipedOutputStream outputStream1;

    private final PipedInputStream inputStream2;

    private final PipedOutputStream outputStream2;

    public PipedDemo(){

    inputStream1 = new PipedInputStream();

    outputStream1 = new PipedOutputStream();

    inputStream2 = new PipedInputStream();

    outputStream2 = new PipedOutputStream();

    try {

    inputStream1.connect(outputStream2);

    inputStream2.connect(outputStream1);

    } catch (IOException e) {

    e.printStackTrace();

    }

    }

    /**程序退出时,需要关闭stream*/

    public void shutdown() throws IOException {

    inputStream1.close();

    inputStream2.close();

    outputStream1.close();

    outputStream2.close();

    }

    public static void main(String[] args) throws IOException {

    PipedDemo demo =new PipedDemo();

    new Thread(()->{

    PipedInputStream in = demo.inputStream2;

    PipedOutputStream out = demo.outputStream2;

    for (int i = 0; i < 10; i++) {

    try {

    byte[] inArr = new byte[2];

    in.read(inArr);

    System.out.print(Thread.currentThread().getName()+": "+i+" ");

    System.out.println(new String(inArr));

    while(true){

    if("go".equals(new String(inArr)))

    break;

    }

    out.write("ok".getBytes());

    } catch (IOException e) {

    e.printStackTrace();

    }

    }

    }).start();

    new Thread(()->{

    PipedInputStream in = demo.inputStream1;

    PipedOutputStream out = demo.outputStream1;

    for (int i = 0; i < 10; i++) {

    try {

    out.write("go".getBytes());

    byte[] inArr = new byte[2];

    in.read(inArr);

    System.out.print(Thread.currentThread().getName()+": "+i+" ");

    System.out.println(new String(inArr));

    while(true){

    if("ok".equals(new String(inArr)))

    break;

    }

    } catch (IOException e) {

    e.printStackTrace();

    }

    }

    }).start();

    // demo.shutdown();

    }

    }

    输出:

    Thread-0: 0 go

    Thread-1: 0 ok

    Thread-0: 1 go

    Thread-1: 1 ok

    Thread-0: 2 go

    Thread-1: 2 ok

    Thread-0: 3 go

    Thread-1: 3 ok

    Thread-0: 4 go

    Thread-1: 4 ok

    Thread-0: 5 go

    Thread-1: 5 ok

    Thread-0: 6 go

    Thread-1: 6 ok

    Thread-0: 7 go

    Thread-1: 7 ok

    Thread-0: 8 go

    Thread-1: 8 ok

    Thread-0: 9 go

    Thread-1: 9 ok

    三、利用BlockingQueue

    BlockingQueue定义的常用方法如下:

    add(Object):把Object加到BlockingQueue里,如果BlockingQueue可以容纳,则返回true,否则抛出异常。

    offer(Object):表示如果可能的话,将Object加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false。

    put(Object):把Object加到BlockingQueue里,如果BlockingQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里有空间再继续。

    poll(time):获取并删除BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null。当不传入time值时,立刻返回。

    peek():立刻获取BlockingQueue里排在首位的对象,但不从队列里删除,如果队列为空,则返回null。

    take():获取并删除BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的对象被加入为止。

    BlockingQueue有四个具体的实现类:

    ArrayBlockingQueue:数组阻塞队列,规定大小,其构造函数必须带一个int参数来指明其大小。其所含的对象是以FIFO(先入先出)顺序排序的。

    LinkedBlockingQueue:链阻塞队列,大小不定,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定。其所含的对象是以FIFO顺序排序的。

    PriorityBlockingQueue:类似于LinkedBlockingQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数所带的Comparator决定的顺序。

    SynchronousQueue:特殊的BlockingQueue,它的内部同时只能够容纳单个元素,对其的操作必须是放和取交替完成的。

    DelayQueue:延迟队列,注入其中的元素必须实现 java.util.concurrent.Delayed 接口

    所有BlockingQueue的使用方式类似,以下例子一个线程写入,一个线程读取,操作的是同一个Queue:

    public class BlockingQueueDemo {

    public static void main(String[] args) {

    LinkedBlockingQueue queue = new LinkedBlockingQueue<>();

    //读线程

    new Thread(() -> {

    int i =0;

    while (true) {

    try {

    String item = queue.take();

    System.out.print(Thread.currentThread().getName() + ": " + i + " ");

    System.out.println(item);

    i++;

    } catch (Exception e) {

    e.printStackTrace();

    }

    }

    }).start();

    //写线程

    new Thread(() -> {

    for (int i = 0; i < 10; i++) {

    try {

    String item = "go"+i;

    System.out.print(Thread.currentThread().getName() + ": " + i + " ");

    System.out.println(item);

    queue.put(item);

    } catch (Exception e) {

    e.printStackTrace();

    }

    }

    }).start();

    }

    }

    四、利用LockSupport

    用LockSupport的unpark()和park()方法,实现线程间通信。

    d0cfabefc3a2a6046ee225f5c15b2e01.png

    五、利用ThreadLocal

    ThreadLocal,即线程变量,是一个以 ThreadLocal 对象为键、任意对象为值的存储结构。这个结构被依附在线程上,也就是说一个线程可以根据一个 ThreadLocal 对象查询到绑定在这个线程上的一个值。

    可以通过 set(T) 方法来设置一个值,在当前线程下再通过 get() 方法获取到原先设置的值。

    public class ThreadLocalDemo {

    private static final ThreadLocal TIME_THREADLOCAL = new ThreadLocal<>() {

    @Override

    protected Long initialValue() {

    return System.currentTimeMillis();

    }

    };

    public static final void begin() {

    TIME_THREADLOCAL.set(System.currentTimeMillis());

    }

    public static final long end() {

    return System.currentTimeMillis() - TIME_THREADLOCAL.get();

    }

    public static void main(String[] args) throws InterruptedException {

    ThreadLocalDemo.begin();

    Thread.sleep(2000);

    System.out.println(ThreadLocalDemo.end());

    }

    }

    //输出 2003

    展开全文
  • Java线程间通信

    2021-02-28 16:38:52
    Java线程间通信下一节>上述例题无条件的阻塞了其他线程异步访问某个方法。Java对象中隐式管程的应用是很强大的,但是你可以通过进程间通信达到更微妙的境界。这在Java中是尤为简单的。像前面所讨论过的,多线程...

    Java线程间通信

    下一节>

    上述例题无条件的阻塞了其他线程异步访问某个方法。Java对象中隐式管程的应用是很强大的,但是你可以通过进程间通信达到更微妙的境界。这在Java中是尤为简单的。

    像前面所讨论过的,多线程通过把任务分成离散的和合乎逻辑的单元代替了事件循环程序。线程还有第二优点:它远离了轮询。轮询通常由重复监测条件的循环实现。一旦条件成立,就要采取适当的行动。这浪费了CPU时间。举例来说,考虑经典的序列问题,当一个线程正在产生数据而另一个程序正在消费它。为使问题变得更有趣,假设数据产生器必须等待消费者完成工作才能产生新的数据。在轮询系统,消费者在等待生产者产生数据时会浪费很多CPU周期。一旦生产者完成工作,它将启动轮询,浪费更多的CPU时间等待消费者的工作结束,如此下去。很明显,这种情形不受欢迎。

    为避免轮询,Java包含了通过wait( ),notify( )和notifyAll( )方法实现的一个进程间通信机制。这些方法在对象中是用final方法实现的,所以所有的类都含有它们。这三个方法仅在synchronized方法中才能被调用。尽管这些方法从计算机科学远景方向上来说具有概念的高度先进性,实际中用起来是很简单的:

    wait( ) 告知被调用的线程放弃管程进入睡眠直到其他线程进入相同管程并且调用notify( )。

    notify( ) 恢复相同对象中第一个调用 wait( ) 的线程。

    notifyAll( ) 恢复相同对象中所有调用 wait( ) 的线程。具有最高优先级的线程最先运行。

    这些方法在Object中被声明,如下所示:

    final void wait( ) throws InterruptedException

    final void notify( )

    final void notifyAll( )

    wait( )存在的另外的形式允许你定义等待时间。

    下面的例子程序错误的实行了一个简单生产者/消费者的问题。它由四个类组成:Q,设法获得同步的序列;Producer,产生排队的线程对象;Consumer,消费序列的线程对象;以及PC,创建单个Q,Producer,和Consumer的小类。

    // An incorrect implementation of a producer and consumer.

    class Q {

    int n;

    synchronized int get() {

    System.out.println("Got: " + n);

    return n;

    }

    synchronized void put(int n) {

    this.n = n;

    System.out.println("Put: " + n);

    }

    }

    class Producer implements Runnable {

    Q q;

    Producer(Q q) {

    this.q = q;

    new Thread(this, "Producer").start();

    }

    public void run() {

    int i = 0;

    while(true) {

    q.put(i++);

    }

    }

    }

    class Consumer implements Runnable {

    Q q;

    Consumer(Q q) {

    this.q = q;

    new Thread(this, "Consumer").start();

    }

    public void run() {

    while(true) {

    q.get();

    }

    }

    }

    class PC {

    public static void main(String args[]) {

    Q q = new Q();

    new Producer(q);

    new Consumer(q);

    System.out.println("Press Control-C to stop.");

    }

    }

    尽管Q类中的put( )和get( )方法是同步的,没有东西阻止生产者超越消费者,也没有东西阻止消费者消费同样的序列两次。这样,你就得到下面的错误输出(输出将随处理器速度和装载的任务而改变):

    Put: 1

    Got: 1

    Got: 1

    Got: 1

    Got: 1

    Got: 1

    Put: 2

    Put: 3

    Put: 4

    Put: 5

    Put: 6

    Put: 7

    Got: 7

    生产者生成1后,消费者依次获得同样的1五次。生产者在继续生成2到7,消费者没有机会获得它们。

    用Java正确的编写该程序是用wait( )和notify( )来对两个方向进行标志,如下所示:

    // A correct implementation of a producer and consumer.

    class Q {

    int n;

    boolean valueSet = false;

    synchronized int get() {

    if(!valueSet)

    try {

    wait();

    } catch(InterruptedException e) {

    System.out.println("InterruptedException caught");

    }

    System.out.println("Got: " + n);

    valueSet = false;

    notify();

    return n;

    }

    synchronized void put(int n) {

    if(valueSet)

    try {

    wait();

    } catch(InterruptedException e) {

    System.out.println("InterruptedException caught");

    }

    this.n = n;

    valueSet = true;

    System.out.println("Put: " + n);

    notify();

    }

    }

    class Producer implements Runnable {

    Q q;

    Producer(Q q) {

    this.q = q;

    new Thread(this, "Producer").start();

    }

    public void run() {

    int i = 0;

    while(true) {

    q.put(i++);

    }

    }

    }

    class Consumer implements Runnable {

    Q q;

    Consumer(Q q) {

    this.q = q;

    new Thread(this, "Consumer").start();

    }

    public void run() {

    while(true) {

    q.get();

    }

    }

    }

    class PCFixed {

    public static void main(String args[]) {

    Q q = new Q();

    new Producer(q);

    new Consumer(q);

    System.out.println("Press Control-C to stop.");

    }

    }

    内部get( ), wait( )被调用。这使执行挂起直到Producer 告知数据已经预备好。这时,内部get( ) 被恢复执行。获取数据后,get( )调用notify( )。这告诉Producer可以向序列中输入更多数据。在put( )内,wait( )挂起执行直到Consumer取走了序列中的项目。当执行再继续,下一个数据项目被放入序列,notify( )被调用,这通知Consumer它应该移走该数据。

    下面是该程序的输出,它清楚的显示了同步行为:

    Put: 1

    Got: 1

    Put: 2

    Got: 2

    Put: 3

    Got: 3

    Put: 4

    Got: 4

    Put: 5

    Got: 5

    下一节>

    展开全文
  • 在这种方式下,线程A不断地改变条件,线程ThreadB不停地通过while语句检测这个条件(list.size()==5)是否成立 ,从而实现了线程间通信。但是这种方式会浪费CPU资源。之所以说它浪费资源,是因为JVM调度器将CPU交给...

    synchronized同步

    public class MyObject {

    synchronized public void methodA() {

    //do something....

    }

    synchronized public void methodB() {

    //do some other thing

    }

    }

    public class ThreadA extends Thread {

    private MyObject object;

    //省略构造方法

    @Override

    public void run() {

    super.run();

    object.methodA();

    }

    }

    public class ThreadB extends Thread {

    private MyObject object;

    //省略构造方法

    @Override

    public void run() {

    super.run();

    object.methodB();

    }

    }

    public class Run {

    public static void main(String[] args) {

    MyObject object = new MyObject();

    //线程A与线程B 持有的是同一个对象:object

    ThreadA a = new ThreadA(object);

    ThreadB b = new ThreadB(object);

    a.start();

    b.start();

    }

    }

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    39

    40

    41

    42

    43

    44

    由于线程A和线程B持有同一个MyObject类的对象object,尽管这两个线程需要调用不同的方法,但是它们是同步执行的,比如:线程B需要等待线程A执行完了methodA()方法之后,它才能执行methodB()方法。这样,线程A和线程B就实现了通信。

    这种方式,本质上就是“共享内存”式的通信。多个线程需要访问同一个共享变量,谁拿到了锁(获得了访问权限),谁就可以执行。

    while轮询

    其实就是多线程同时执行,会牺牲部分CPU性能。

    在这种方式下,线程A不断地改变条件,线程ThreadB不停地通过while语句检测这个条件(list.size()==5)是否成立 ,从而实现了线程间的通信。但是这种方式会浪费CPU资源。之所以说它浪费资源,是因为JVM调度器将CPU交给线程B执行时,它没做啥“有用”的工作,只是在不断地测试 某个条件是否成立。就类似于现实生活中,某个人一直看着手机屏幕是否有电话来了,而不是: 在干别的事情,当有电话来时,响铃通知TA电话来了。

    import java.util.ArrayList;

    import java.util.List;

    public class MyList {

    private List list = new ArrayList();

    public void add() {

    list.add("elements");

    }

    public int size() {

    return list.size();

    }

    }

    import mylist.MyList;

    public class ThreadA extends Thread {

    private MyList list;

    public ThreadA(MyList list) {

    super();

    this.list = list;

    }

    @Override

    public void run() {

    try {

    for (int i = 0; i < 10; i++) {

    list.add();

    System.out.println("添加了" + (i + 1) + "个元素");

    Thread.sleep(1000);

    }

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    }

    }

    import mylist.MyList;

    public class ThreadB extends Thread {

    private MyList list;

    public ThreadB(MyList list) {

    super();

    this.list = list;

    }

    @Override

    public void run() {

    try {

    while (true) {

    if (list.size() == 5) {

    System.out.println("==5, 线程b准备退出了");

    throw new InterruptedException();

    }

    }

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    }

    }

    import mylist.MyList;

    import extthread.ThreadA;

    import extthread.ThreadB;

    public class Test {

    public static void main(String[] args) {

    MyList service = new MyList();

    ThreadA a = new ThreadA(service);

    a.setName("A");

    a.start();

    ThreadB b = new ThreadB(service);

    b.setName("B");

    b.start();

    }

    }

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    39

    40

    41

    42

    43

    44

    45

    46

    47

    48

    49

    50

    51

    52

    53

    54

    55

    56

    57

    58

    59

    60

    61

    62

    63

    64

    65

    66

    67

    68

    69

    70

    71

    72

    73

    74

    75

    76

    77

    78

    79

    80

    81

    82

    83

    wait/notify机制

    public class MyList {

    private static List list = new ArrayList();

    public static void add() {

    list.add("anyString");

    }

    public static int size() {

    return list.size();

    }

    }

    public class ThreadA extends Thread {

    private Object lock;

    public ThreadA(Object lock) {

    super();

    this.lock = lock;

    }

    @Override

    public void run() {

    try {

    synchronized (lock) {

    if (MyList.size() != 5) {

    System.out.println("wait begin " + System.currentTimeMillis());

    lock.wait();

    System.out.println("Interruption!!!");

    //lock.wait();

    lock.notify();

    lock.wait();

    System.out.println("wait end " + System.currentTimeMillis());

    }

    }

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    }

    }

    public class ThreadB extends Thread {

    private Object lock;

    public ThreadB(Object lock) {

    super();

    this.lock = lock;

    }

    @Override

    public void run() {

    try {

    synchronized (lock) {

    for (int i = 0; i < 10; i++) {

    MyList.add();

    if (MyList.size() == 5) {

    lock.notify();

    System.out.println("已经发出了通知");

    lock.wait();

    }

    System.out.println("添加了" + (i + 1) + "个元素!");

    Thread.sleep(1000);

    }

    }

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    }

    }

    public class Test {

    public static void main(String[] args) {

    try {

    Object lock = new Object();

    ThreadA a = new ThreadA(lock);

    a.start();

    Thread.sleep(50);

    ThreadB b = new ThreadB(lock);

    b.start();

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    }

    }

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    39

    40

    41

    42

    43

    44

    45

    46

    47

    48

    49

    50

    51

    52

    53

    54

    55

    56

    57

    58

    59

    60

    61

    62

    63

    64

    65

    66

    67

    68

    69

    70

    71

    72

    73

    74

    75

    76

    77

    78

    79

    80

    81

    82

    83

    84

    85

    86

    87

    88

    89

    90

    91

    wait begin 1498007974397

    添加了1个元素!

    添加了2个元素!

    添加了3个元素!

    添加了4个元素!

    已经发出了通知

    Interruption!!!

    添加了5个元素!

    添加了6个元素!

    添加了7个元素!

    添加了8个元素!

    添加了9个元素!

    添加了10个元素!

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    线程A要等待某个条件满足时(list.size()==5),才执行操作。线程B则向list中添加元素,改变list 的size。

    A,B之间如何通信的呢?也就是说,线程A如何知道 list.size() 已经为5了呢?

    这里用到了Object类的 wait() 和 notify() 方法。

    当条件未满足时(list.size() !=5),线程A调用wait() 放弃CPU,并进入阻塞状态。—不像②while轮询那样占用CPU

    当条件满足时,线程B调用 notify()通知 线程A,所谓通知线程A,就是唤醒线程A,并让它进入可运行状态。

    这种方式的一个好处就是CPU的利用率提高了。

    管道通信

    管道流主要用来实现两个线程之间的二进制数据的传播,下面以PipedInputStream类和PipedOutputStream类为例,实现生产者-消费者:

    package test.pipe;

    import java.io.IOException;

    import java.io.PipedInputStream;

    import java.io.PipedOutputStream;

    /**

    * 我们以数字替代产品 生产者每5秒提供5个产品,放入管道

    */

    class MyProducer extends Thread {

    private PipedOutputStream outputStream;

    private int index = 0;

    public MyProducer(PipedOutputStream outputStream) {

    this.outputStream = outputStream;

    }

    @Override

    public void run() {

    while (true) {

    try {

    for (int i = 0; i < 5; i++) {

    outputStream.write(index++);

    }

    } catch (IOException e) {

    e.printStackTrace();

    }

    try {

    Thread.sleep(5000);

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    }

    }

    }

    /**

    * 消费者每0.5秒从管道中取1件产品,并打印剩余产品数量,并打印产品信息(以数字替代)

    */

    class MyConsumer extends Thread {

    private PipedInputStream inputStream;

    public MyConsumer(PipedInputStream inputStream) {

    this.inputStream = inputStream;

    }

    @Override

    public void run() {

    while (true) {

    try {

    Thread.sleep(500);

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    try {

    int count = inputStream.available();

    if (count > 0) {

    System.out.println("rest product count: " + count);

    System.out.println("get product: " + inputStream.read());

    }

    } catch (IOException e1) {

    e1.printStackTrace();

    }

    }

    }

    }

    public class PipeTest1 {

    public static void main(String[] args) {

    PipedOutputStream pos = new PipedOutputStream();

    PipedInputStream pis = new PipedInputStream();

    try {

    pis.connect(pos);

    } catch (IOException e) {

    e.printStackTrace();

    }

    new MyProducer(pos).start();

    new MyConsumer(pis).start();

    }

    }

    ---------------------

    作者:Hadwin1991

    来源:CSDN

    原文:https://blog.csdn.net/Hadwin1991/article/details/73527835

    版权声明:本文为博主原创文章,转载请附上博文链接!

    展开全文
  • JAVA线程间通信简介

    2021-02-12 09:33:03
    本文将讲解以下几个JAVA线程间通信的主题:1、通过共享对象通信线程发送信号的一个简单方式是在共享对象的变量里设置信号值。线程A在一个同步块里设置boolean型成员变量hasDataToProcess为true,线程B也在同步块里...

    线程通信的目标是使线程间能够互相发送信号。另一方面,线程通信使线程能够等待其他线程的信号。

    6a6ae4289ad973b10e50a1de17eeb12b.png

    例如,线程B可以等待线程A的一个信号,这个信号会通知线程B数据已经准备好了。本文将讲解以下几个JAVA线程间通信的主题:

    1、通过共享对象通信

    线程间发送信号的一个简单方式是在共享对象的变量里设置信号值。线程A在一个同步块里设置boolean型成员变量hasDataToProcess为true,线程B也在同步块里读取hasDataToProcess这个成员变量。这个简单的例子使用了一个持有信号的对象,并提供了set和check方法:

    public class MySignal{

    protected boolean hasDataToProcess = false;

    public synchronized boolean hasDataToProcess(){

    return this.hasDataToProcess;

    }

    public synchronized void setHasDataToProcess(boolean hasData){

    this.hasDataToProcess = hasData;

    }

    }

    线程A和B必须获得指向一个MySignal共享实例的引用,以便进行通信。如果它们持有的引用指向不同的MySingal实例,那么彼此将不能检测到对方的信号。需要处理的数据可以存放在一个共享缓存区里,它和MySignal实例是分开存放的。

    2、忙等待(Busy Wait)

    准备处理数据的线程B正在等待数据变为可用。换句话说,它在等待线程A的一个信号,这个信号使hasDataToProcess()返回true。线程B运行在一个循环里,以等待这个信号:

    protected MySignal sharedSignal = ...

    ...

    while(!sharedSignal.hasDataToProcess()){

    //do nothing... busy waiting

    }

    3、wait(),notify()和notifyAll()

    忙等待没有对运行等待线程的CPU进行有效的利用,除非平均等待时间非常短。否则,让等待线程进入睡眠或者非运行状态更为明智,直到它接收到它等待的信号。

    Java有一个内建的等待机制来允许线程在等待信号的时候变为非运行状态。java.lang.Object 类定义了三个方法,wait()、notify()和notifyAll()来实现这个等待机制。

    一个线程一旦调用了任意对象的wait()方法,就会变为非运行状态,直到另一个线程调用了同一个对象的notify()方法。为了调用wait()或者notify(),线程必须先获得那个对象的锁。也就是说,线程必须在同步块里调用wait()或者notify()。以下是MySingal的修改版本——使用了wait()和notify()的MyWaitNotify:

    public class MonitorObject{

    }

    public class MyWaitNotify{

    MonitorObject myMonitorObject = new MonitorObject();

    public void doWait(){

    synchronized(myMonitorObject){

    try{

    myMonitorObject.wait();

    } catch(InterruptedException e){...}

    }

    }

    public void doNotify(){

    synchronized(myMonitorObject){

    myMonitorObject.notify();

    }

    }

    }

    等待线程将调用doWait(),而唤醒线程将调用doNotify()。当一个线程调用一个对象的notify()方法,正在等待该对象的所有线程中将有一个线程被唤醒并允许执行(校注:这个将被唤醒的线程是随机的,不可以指定唤醒哪个线程)。同时也提供了一个notifyAll()方法来唤醒正在等待一个给定对象的所有线程。

    如你所见,不管是等待线程还是唤醒线程都在同步块里调用wait()和notify()。这是强制性的!一个线程如果没有持有对象锁,将不能调用wait(),notify()或者notifyAll()。否则,会抛出IllegalMonitorStateException异常。

    (校注:JVM是这么实现的,当你调用wait时候它首先要检查下当前线程是否是锁的拥有者,不是则抛出IllegalMonitorStateExcept,参考JVM源码的 1422行。)

    但是,这怎么可能?等待线程在同步块里面执行的时候,不是一直持有监视器对象(myMonitor对象)的锁吗?等待线程不能阻塞唤醒线程进入doNotify()的同步块吗?答案是:的确不能。一旦线程调用了wait()方法,它就释放了所持有的监视器对象上的锁。这将允许其他线程也可以调用wait()或者notify()。

    一旦一个线程被唤醒,不能立刻就退出wait()的方法调用,直到调用notify()的线程退出了它自己的同步块。换句话说:被唤醒的线程必须重新获得监视器对象的锁,才可以退出wait()的方法调用,因为wait方法调用运行在同步块里面。如果多个线程被notifyAll()唤醒,那么在同一时刻将只有一个线程可以退出wait()方法,因为每个线程在退出wait()前必须获得监视器对象的锁。

    4、丢失的信号(Missed Signals)

    notify()和notifyAll()方法不会保存调用它们的方法,因为当这两个方法被调用时,有可能没有线程处于等待状态。通知信号过后便丢弃了。因此,如果一个线程先于被通知线程调用wait()前调用了notify(),等待的线程将错过这个信号。这可能是也可能不是个问题。不过,在某些情况下,这可能使等待线程永远在等待,不再醒来,因为线程错过了唤醒信号。

    为了避免丢失信号,必须把它们保存在信号类里。在MyWaitNotify的例子中,通知信号应被存储在MyWaitNotify实例的一个成员变量里。以下是MyWaitNotify的修改版本:

    public class MyWaitNotify2{

    MonitorObject myMonitorObject = new MonitorObject();

    boolean wasSignalled = false;

    public void doWait(){

    synchronized(myMonitorObject){

    if(!wasSignalled){

    try{

    myMonitorObject.wait();

    } catch(InterruptedException e){...}

    }

    //clear signal and continue running.

    wasSignalled = false;

    }

    }

    public void doNotify(){

    synchronized(myMonitorObject){

    wasSignalled = true;

    myMonitorObject.notify();

    }

    }

    }

    留意doNotify()方法在调用notify()前把wasSignalled变量设为true。同时,留意doWait()方法在调用wait()前会检查wasSignalled变量。事实上,如果没有信号在前一次doWait()调用和这次doWait()调用之间的时间段里被接收到,它将只调用wait()。

    (校注:为了避免信号丢失, 用一个变量来保存是否被通知过。在notify前,设置自己已经被通知过。在wait后,设置自己没有被通知过,需要等待通知。)

    5、假唤醒

    由于莫名其妙的原因,线程有可能在没有调用过notify()和notifyAll()的情况下醒来。这就是所谓的假唤醒(spurious wakeups)。无端端地醒过来了。

    如果在MyWaitNotify2的doWait()方法里发生了假唤醒,等待线程即使没有收到正确的信号,也能够执行后续的操作。这可能导致你的应用程序出现严重问题。

    为了防止假唤醒,保存信号的成员变量将在一个while循环里接受检查,而不是在if表达式里。这样的一个while循环叫做自旋锁(校注:这种做法要慎重,目前的JVM实现自旋会消耗CPU,如果长时间不调用doNotify方法,doWait方法会一直自旋,CPU会消耗太大)。被唤醒的线程会自旋直到自旋锁(while循环)里的条件变为false。以下MyWaitNotify2的修改版本展示了这点:

    public class MyWaitNotify3{

    MonitorObject myMonitorObject = new MonitorObject();

    boolean wasSignalled = false;

    public void doWait(){

    synchronized(myMonitorObject){

    while(!wasSignalled){

    try{

    myMonitorObject.wait();

    } catch(InterruptedException e){...}

    }

    //clear signal and continue running.

    wasSignalled = false;

    }

    }

    public void doNotify(){

    synchronized(myMonitorObject){

    wasSignalled = true;

    myMonitorObject.notify();

    }

    }

    }

    留意wait()方法是在while循环里,而不在if表达式里。如果等待线程没有收到信号就唤醒,wasSignalled变量将变为false,while循环会再执行一次,促使醒来的线程回到等待状态。

    6、多个线程等待相同信号

    如果你有多个线程在等待,被notifyAll()唤醒,但只有一个被允许继续执行,使用while循环也是个好方法。每次只有一个线程可以获得监视器对象锁,意味着只有一个线程可以退出wait()调用并清除wasSignalled标志(设为false)。一旦这个线程退出doWait()的同步块,其他线程退出wait()调用,并在while循环里检查wasSignalled变量值。但是,这个标志已经被第一个唤醒的线程清除了,所以其余醒来的线程将回到等待状态,直到下次信号到来。

    7、不要在字符串常量或全局对象中调用wait()

    (校注:本章说的字符串常量指的是值为常量的变量)

    本文早期的一个版本在MyWaitNotify例子里使用字符串常量(”")作为管程对象。以下是那个例子:

    public class MyWaitNotify{

    String myMonitorObject = "";

    boolean wasSignalled = false;

    public void doWait(){

    synchronized(myMonitorObject){

    while(!wasSignalled){

    try{

    myMonitorObject.wait();

    } catch(InterruptedException e){...}

    }

    //clear signal and continue running.

    wasSignalled = false;

    }

    }

    public void doNotify(){

    synchronized(myMonitorObject){

    wasSignalled = true;

    myMonitorObject.notify();

    }

    }

    }

    在空字符串作为锁的同步块(或者其他常量字符串)里调用wait()和notify()产生的问题是,JVM/编译器内部会把常量字符串转换成同一个对象。这意味着,即使你有2个不同的MyWaitNotify实例,它们都引用了相同的空字符串实例。同时也意味着存在这样的风险:在第一个MyWaitNotify实例上调用doWait()的线程会被在第二个MyWaitNotify实例上调用doNotify()的线程唤醒。这种情况可以画成以下这张图:

    strings-wait-notify.png

    起初这可能不像个大问题。毕竟,如果doNotify()在第二个MyWaitNotify实例上被调用,真正发生的事不外乎线程A和B被错误的唤醒了 。这个被唤醒的线程(A或者B)将在while循环里检查信号值,然后回到等待状态,因为doNotify()并没有在第一个MyWaitNotify实例上调用,而这个正是它要等待的实例。这种情况相当于引发了一次假唤醒。线程A或者B在信号值没有更新的情况下唤醒。但是代码处理了这种情况,所以线程回到了等待状态。记住,即使4个线程在相同的共享字符串实例上调用wait()和notify(),doWait()和doNotify()里的信号还会被2个MyWaitNotify实例分别保存。在MyWaitNotify1上的一次doNotify()调用可能唤醒MyWaitNotify2的线程,但是信号值只会保存在MyWaitNotify1里。

    问题在于,由于doNotify()仅调用了notify()而不是notifyAll(),即使有4个线程在相同的字符串(空字符串)实例上等待,只能有一个线程被唤醒。所以,如果线程A或B被发给C或D的信号唤醒,它会检查自己的信号值,看看有没有信号被接收到,然后回到等待状态。而C和D都没被唤醒来检查它们实际上接收到的信号值,这样信号便丢失了。这种情况相当于前面所说的丢失信号的问题。C和D被发送过信号,只是都不能对信号作出回应。

    如果doNotify()方法调用notifyAll(),而非notify(),所有等待线程都会被唤醒并依次检查信号值。线程A和B将回到等待状态,但是C或D只有一个线程注意到信号,并退出doWait()方法调用。C或D中的另一个将回到等待状态,因为获得信号的线程在退出doWait()的过程中清除了信号值(置为false)。

    看过上面这段后,你可能会设法使用notifyAll()来代替notify(),但是这在性能上是个坏主意。在只有一个线程能对信号进行响应的情况下,没有理由每次都去唤醒所有线程。

    所以:在wait()/notify()机制中,不要使用全局对象,字符串常量等。应该使用对应唯一的对象。例如,每一个MyWaitNotify3的实例(前一节的例子)拥有一个属于自己的监视器对象,而不是在空字符串上调用wait()/notify()。

    校注:

    <

    p>管程 (英语:Monitors,也称为监视器) 是对多个工作线程实现互斥访问共享资源的对象或模块。这些共享资源一般是硬件设备或一群变量。管程实现了在一个时间点,最多只有一个线程在执行它的某个子程序。与那些通过修改数据结构实现互斥访问的并发程序设计相比,管程很大程度上简化了程序设计。

    <

    p>

    原文 Thread Signaling

    译者:杜建雄 校对:方腾飞

    via ifeve

    展开全文
  • 一,介绍本总结我对于JAVA线程线程之间的通信方式的理解,主要以代码结合文字的方式来讨论线程间的通信,故摘抄了书中的一些示例代码。二,线程间通信方式①同步这里讲的同步是指多个线程通过synchronized...
  • JAVA线程线程间通信方式解析一,介绍本总结我对于JAVA线程线程之间的通信方式的理解,主要以代码结合文字的方式来讨论线程间的通信,故摘抄了书中的一些示例代码。二,线程间的.通信方式①同步这里讲的...
  • Java线程间通信方式

    2021-03-01 07:59:24
    Java线程间通信的方式线程通信方式①同步这里讲的同步是指多个线程通过 synchronized 关键字这种方式来实现线程的通信。参考示例:[](javascript:void(0)????public class MyObject {synchronized public void...
  • java线程间通信方式

    2021-03-12 21:42:51
    java线程间通信方式1. 共享变量2. 等待/通知3. 管道流1. 共享变量volatile修饰的变量,线程可见,可使用这种变量作为线程传递消息的媒介;延伸出来的,还有redis中的值,数据库中的值,都可以作为线程共同...
  • 1.1 基本概念以及线程与进程之间的区别联系关于进程和线程,首先从定义上理解就有所不同:进程是具有一定独立功能的程序、它是系统进行资源分配和调度的一个独立单位,重点在系统调度和单独的单位,也就是说进程是...
  • 本总结我对于JAVA线程线程之间的通信方式的理解,主要以代码结合文字的方式来讨论线程间的通信,故摘抄了书中的一些示例代码,具体内容如下①同步这里讲的同步是指多个线程通过synchronized关键字这种方式来实现...
  • 在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体。进程是一个具有独立功能的程序关于某个数据集合的一次运行活动。它可以申请和拥有系统资源,是一个...
  • 一,介绍本总结我对于JAVA线程线程之间的通信方式的理解,主要以代码结合文字的方式来讨论线程间的通信,故摘抄了书中的一些示例代码。二,线程间通信方式①同步这里讲的同步是指多个线程通过synchronized...
  • 一般用于控制并发线程数,及线程间互斥。另外重入锁 ReentrantLock 也可以实现该功能,但实现上要复杂些。功能就类似厕所有5个坑,假如有10个人要上厕所,那么同时只能有多少个人去上厕所呢?...
  • 线程间通信 概念:多个线程在处理同一个资源,但是处理的动作(线程的任务)却不同 为什么要处理线程间通信: 多个线程并发执行时,默认情况下cpu是随机切换线程的,当我们需要多个线程来共同完成一...
  • 5.1 分布式学习面试资料 5.2 分布式面试专题系列:缓存+限流+通讯 总结 其他的内容都可以按照路线图里面整理出来的知识点逐一去熟悉,学习,消化,不建议你去看书学习,最好是多看一些视频,把不懂地方反复看,学习...
  • 一、管道流是JAVA线程通讯的常用方式之一,基本流程如下:1)创建管道输出流PipedOutputStream pos和管道输入流PipedInputStream pis2)将pos和pis匹配,pos.connect(pis);3)将pos赋给信息输入线程,pis赋给信息获取...
  • 文章目录volatile、synchronized、ReentrantLock/Condition 来实现线程间通信 这篇博客是是我在学习了多线程并发和操作系统后,针对Java中的情况,为保证线程安全和线程的并发运行进行总结的博客。 看了我的博客后...
  • 5 Java线程间通信

    2021-03-03 14:46:56
    但是当我们需要多个线程之间相互协作的时候,就需要我们掌握Java线程通信方式。本文将介绍Java线程之间的几种通信原理。5.1 锁与同步在Java中,锁的概念都是基于对象的,所以我们又经常称它为对象锁。线程和锁的...
  • Java线程间通信-回调的实现方式Java线程间通信是非常复杂的问题的。线程间通信问题本质上是如何将与线程相关的变量或者对象传递给别的线程,从而实现交互。比如举一个简单例子,有一个多线程的类,用来计算文件的MD5...
  • java线程间通讯的几种方式

    千次阅读 多人点赞 2021-02-04 16:38:10
    这个时候就需要线程间进行通讯 A执行完了后对B说一声,喂B,我执行完了 来康康用Java怎么实现 1、基于synchronized 2、基于reentrantLock 3、基于volatile 4、基于countDownLatch 我目前就知道这四种 1、...
  • Android进程线程间通信方式

    千次阅读 2021-01-17 13:22:41
    线程自己基本上不拥有系统资源,只拥有一些在运行中必不可少的资源(如程序计数器,一组寄存器和栈),但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源。区别:(1)、一个程序至少有一个进程,一个进...
  • 这篇文章主要讲解Java线程之间的几种通信方式:wait() + notify() (Object)await() + signal() (condition 条件锁)join方式前三种通信方式主要通过一种线程同步的场景进行讲解:假设有a()、b()、c()两个任务分别有...
  • Linux系统中的进程间通信方式主要以下几种:同一主机上的进程通信方式* UNIX进程间通信方式: 包括管道(PIPE), 有名管道(FIFO), 和信号(Signal)* System V进程通信方式:包括信号量(Semaphore), 消息队列(Message ...
  • 【多线程高并发】Java线程间通信方式 Java线程间通信方式: 共享变量; 等待/通知; 管道流; 1.共享变量 volatile修饰的变量,线程可见,可使用这种变量作为线程传递消息的媒介; 2.等待/通知 同步...
  • 不过有的时候,我们希望多个线程协同工作来完成某个任务,这时就涉及到了线程间通信了。本文涉及到的知识点:thread.join(), object.wait(), object.notify(), CountdownLatch, CyclicBarrier, FutureTask, Callable...
  • Java线程通讯方式

    2021-02-27 09:53:33
    l 休眠唤醒方式:Object的wait、notify、notifyAllCondition的await、signal、signalAlll CountDownLatch:用于某个线程A等待若干个其他线程执行完之后,它才执行l CyclicBarrier:一组线程等待至某个状态之后再全部...
  • 一、进程间通信(IPC,Inter-Process Communication)是指在不同进程传播或交换信息 1. 无名管道 特点 半双工(数据流向仅有一个方向),具有固定的读端和写端 只能用于父进程或兄弟线程之间通信(具有血缘关系的...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 229,923
精华内容 91,969
关键字:

java线程间的通讯方式

java 订阅