精华内容
下载资源
问答
  • Dubbo生产者消费者示例源代码,Eclipse项目,基于Maven构建,可以直接导入Eclipse开发环境,本人亲测,编译通过,可以直接在Eclipse中运行(包含Main方法),先运行提供者,再运行生产者
  • 实现生产者消费者的三种方式

    万次阅读 多人点赞 2019-10-14 21:22:03
    文章目录wait/notify的消息通知机制预备知识wait/notify消息通知潜在的一些问题notify过早通知等待wait的条件发生变化假死状态wait/notifyAll实现生产者-消费者使用Lock中Condition的await/signalAll实现生产者-消费...

    生产者-消费者模式是一个十分经典的多线程并发协作的模式,弄懂生产者-消费者问题能够让我们对并发编程的理解加深。所谓生产者-消费者问题,实际上主要是包含了两类线程,一种是生产者线程用于生产数据,另一种是消费者线程用于消费数据,为了解耦生产者和消费者的关系,通常会有一个共享的数据区域,就像是一个仓库,生产者生产数据之后直接放置在共享数据区中,并不需要关心消费者的行为;而消费者只需要从共享数据区中去获取数据,就不再需要关心生产者的行为。但是,这个共享数据区域中应该具备这样的线程间并发协作的功能:

    1. 如果共享数据区已满的话,阻塞生产者继续生产数据放置入内;
    2. 如果共享数据区为空的话,阻塞消费者继续消费数据;

    在实现生产者消费者问题时,可以采用三种方式:

    • 使用Object的wait/notify的消息通知机制;

    • 使用Lock的Condition的await/signal的消息通知机制;

    • 使用BlockingQueue实现。

    本文主要将这三种实现方式进行总结归纳。

    wait/notify的消息通知机制

    预备知识

    Java 中,可以通过配合调用 Object 对象的 wait() 方法和 notify()方法或 notifyAll() 方法来实现线程间的通信。在线程中调用 wait() 方法,将阻塞当前线程,直至等到其他线程调用了 notify() 方法或 notifyAll() 方法进行通知之后,当前线程才能从wait()方法出返回,继续执行下面的操作。

    1. wait

      该方法用来将当前线程置入休眠状态,直到接到通知或被中断为止。在调用 wait()之前,线程必须要获得该对象的对象监视器锁,即只能在同步方法或同步块中调用 wait()方法。调用wait()方法之后,当前线程会释放锁。如果调用wait()方法时,线程并未获取到锁的话,则会抛出IllegalMonitorStateException异常,这是以个RuntimeException。如果再次获取到锁的话,当前线程才能从wait()方法处成功返回。

    2. notify

      该方法也要在同步方法或同步块中调用,即在调用前,线程也必须要获得该对象的对象级别锁,如果调用 notify()时没有持有适当的锁,也会抛出 IllegalMonitorStateException
      该方法任意从WAITTING状态的线程中挑选一个进行通知,使得调用wait()方法的线程从等待队列移入到同步队列中,等待有机会再一次获取到锁,从而使得调用wait()方法的线程能够从wait()方法处退出。调用notify后,当前线程不会马上释放该对象锁,要等到程序退出同步块后,当前线程才会释放锁。

    3. notifyAll
      该方法与 notify ()方法的工作方式相同,重要的一点差异是:
      notifyAll 使所有原来在该对象上 wait 的线程统统退出WAITTING状态,使得他们全部从等待队列中移入到同步队列中去,等待下一次能够有机会获取到对象监视器锁。

    wait/notify消息通知潜在的一些问题

    notify过早通知

    notify 通知的遗漏很容易理解,即 threadA 还没开始 wait 的时候,threadB 已经 notify 了,这样,threadB 通知是没有任何响应的,当 threadB 退出 synchronized 代码块后,threadA 再开始 wait,便会一直阻塞等待,直到被别的线程打断。比如在下面的示例代码中,就模拟出notify早期通知带来的问题:

    public class EarlyNotifyDemo1 {
    
        private static String lockObject = "";
    
        public static void main(String[] args) {
            WaitThread waitThread = new WaitThread(lockObject);
            NotifyThread notifyThread = new NotifyThread(lockObject);
            notifyThread.start();
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            waitThread.start();
        }
    
        static class WaitThread extends Thread {
            private String lock;
    
            public WaitThread(String lock) {
                this.lock = lock;
            }
    
            @Override
            public void run() {
                synchronized (lock) {
                    try {
                        System.out.println(Thread.currentThread().getName() + "  进去代码块");
                        System.out.println(Thread.currentThread().getName() + "  开始wait");
                        lock.wait();
                        System.out.println(Thread.currentThread().getName() + "   结束wait");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        static class NotifyThread extends Thread {
            private String lock;
    
            public NotifyThread(String lock) {
                this.lock = lock;
            }
    
            @Override
            public void run() {
                synchronized (lock) {
                    System.out.println(Thread.currentThread().getName() + "  进去代码块");
                    System.out.println(Thread.currentThread().getName() + "  开始notify");
                    lock.notify();
                    System.out.println(Thread.currentThread().getName() + "   结束开始notify");
                }
            }
        }
    }
    

    输出结果

    Thread-1  进去代码块
    Thread-1  开始notify
    Thread-1   结束开始notify
    Thread-0  进去代码块
    Thread-0  开始wait
    

    示例中开启了两个线程,一个是WaitThread,另一个是NotifyThread。NotifyThread会先启动,先调用notify方法。然后WaitThread线程才启动,调用wait方法,但是由于通知过了,wait方法就无法再获取到相应的通知,因此WaitThread会一直在wait方法出阻塞,这种现象就是通知过早的现象。针对这种现象,解决方法,一般是添加一个状态标志,让waitThread调用wait方法前先判断状态是否已经改变了没,如果通知早已发出的话,WaitThread就不再去wait。对上面的代码进行更正:

    public class EarlyNotifyDemo2 {
    
        private static String lockObject = "";
        private static boolean isWait = true;
    
        public static void main(String[] args) {
            WaitThread waitThread = new WaitThread(lockObject);
            NotifyThread notifyThread = new NotifyThread(lockObject);
            notifyThread.start();
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            waitThread.start();
        }
    
        static class WaitThread extends Thread {
            private String lock;
    
            public WaitThread(String lock) {
                this.lock = lock;
            }
    
            @Override
            public void run() {
                synchronized (lock) {
                    try {
                        while (isWait) {
                            System.out.println(Thread.currentThread().getName() + "  进去代码块");
                            System.out.println(Thread.currentThread().getName() + "  开始wait");
                            lock.wait();
                            System.out.println(Thread.currentThread().getName() + "   结束wait");
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        static class NotifyThread extends Thread {
            private String lock;
    
            public NotifyThread(String lock) {
                this.lock = lock;
            }
    
            @Override
            public void run() {
                synchronized (lock) {
                    System.out.println(Thread.currentThread().getName() + "  进去代码块");
                    System.out.println(Thread.currentThread().getName() + "  开始notify");
                    lock.notifyAll();
                    isWait = false;
                    System.out.println(Thread.currentThread().getName() + "   结束开始notify");
                }
            }
        }
    }
    

    这段代码只是增加了一个isWait状态变量,NotifyThread调用notify方法后会对状态变量进行更新,在WaitThread中调用wait方法之前会先对状态变量进行判断,在该示例中,调用notify后将状态变量isWait改变为false,因此,在WaitThread中while对isWait判断后就不会执行wait方法,从而避免了Notify过早通知造成遗漏的情况。

    总结:在使用线程的等待/通知机制时,一般都要配合一个 boolean 变量值(或者其他能够判断真假的条件),在 notify 之前改变该 boolean 变量的值,让 wait 返回后能够退出 while 循环(一般都要在 wait 方法外围加一层 while 循环,以防止早期通知),或在通知被遗漏后,不会被阻塞在 wait 方法处。这样便保证了程序的正确性

    等待wait的条件发生变化

    如果线程在等待时接受到了通知,但是之后等待的条件发生了变化,并没有再次对等待条件进行判断,也会导致程序出现错误。

    下面用一个例子来说明这种情况

    public class ConditionChangeDemo1 {
    
        private static List<String> lockObject = new ArrayList();
    
        public static void main(String[] args) {
            Consumer consumer1 = new Consumer(lockObject);
            Consumer consumer2 = new Consumer(lockObject);
            Productor productor = new Productor(lockObject);
            consumer1.start();
            consumer2.start();
            productor.start();
        }
    
    
        static class Consumer extends Thread {
            private List<String> lock;
    
            public Consumer(List lock) {
                this.lock = lock;
            }
    
            @Override
            public void run() {
                synchronized (lock) {
                    try {
                        //这里使用if的话,就会存在wait条件变化造成程序错误的问题
                        if (lock.isEmpty()) {
                            System.out.println(Thread.currentThread().getName() + " list为空");
                            System.out.println(Thread.currentThread().getName() + " 调用wait方法");
                            lock.wait();
                            System.out.println(Thread.currentThread().getName() + "  wait方法结束");
                        }
                        String element = lock.remove(0);
                        System.out.println(Thread.currentThread().getName() + " 取出第一个元素为:" + element);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
    
    
        }
    
    
        static class Productor extends Thread {
            private List<String> lock;
    
            public Productor(List lock) {
                this.lock = lock;
            }
    
            @Override
            public void run() {
                synchronized (lock) {
                    System.out.println(Thread.currentThread().getName() + " 开始添加元素");
                    lock.add(Thread.currentThread().getName());
                    lock.notifyAll();
                }
            }
    
        }
    
    }
    

    输出结果

    Thread-0 list为空
    Thread-0 调用wait方法
    Thread-2 开始添加元素
    Thread-1 取出第一个元素为:Thread-2
    Thread-0  wait方法结束
    Exception in thread "Thread-0" java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
    

    异常原因分析:在这个例子中一共开启了3个线程,Consumer1,Consumer2以及Productor。首先Consumer1调用了wait方法后,线程处于了WAITTING状态,并且将对象锁释放出来。因此,Consumer2能够获取对象锁,从而进入到同步代块中,当执行到wait方法时,同样的也会释放对象锁。因此,productor能够获取到对象锁,进入到同步代码块中,向list中插入数据后,通过notifyAll方法通知处于WAITING状态的Consumer1和Consumer2线程。consumer1得到对象锁后,从wait方法出退出,删除了一个元素让List为空,方法执行结束,退出同步块,释放掉对象锁。这个时候Consumer2获取到对象锁后,从wait方法退出,继续往下执行,这个时候Consumer2再执行lock.remove(0);就会出错,因为List由于Consumer1删除一个元素之后已经为空了。

    解决方案:通过上面的分析,可以看出Consumer2报异常是因为线程从wait方法退出之后没有再次对wait条件进行判断,因此,此时的wait条件已经发生了变化。解决办法就是,在wait退出之后再对条件进行判断即可。

    public class ConditionChangeDemo2 {
    
        private static List<String> lockObject = new ArrayList();
    
        public static void main(String[] args) {
            Consumer consumer1 = new Consumer(lockObject);
            Consumer consumer2 = new Consumer(lockObject);
            Productor productor = new Productor(lockObject);
            consumer1.start();
            consumer2.start();
            productor.start();
        }
    
    
        static class Consumer extends Thread {
    
            private List<String> lock;
    
            public Consumer(List lock) {
                this.lock = lock;
            }
    
            @Override
            public void run() {
                synchronized (lock) {
                    try {
                        //这里使用if的话,就会存在wait条件变化造成程序错误的问题
                        while (lock.isEmpty()) {
                            System.out.println(Thread.currentThread().getName() + " list为空");
                            System.out.println(Thread.currentThread().getName() + " 调用wait方法");
                            lock.wait();
                            System.out.println(Thread.currentThread().getName() + "  wait方法结束");
                        }
                        String element = lock.remove(0);
                        System.out.println(Thread.currentThread().getName() + " 取出第一个元素为:" + element);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
    
        }
    
        static class Productor extends Thread {
            private List<String> lock;
    
            public Productor(List lock) {
                this.lock = lock;
            }
    
            @Override
            public void run() {
                synchronized (lock) {
                    System.out.println(Thread.currentThread().getName() + " 开始添加元素");
                    lock.add(Thread.currentThread().getName());
                    lock.notifyAll();
                }
            }
    
        }
    
    }
    

    输出结果

    Thread-0 list为空
    Thread-0 调用wait方法
    Thread-2 开始添加元素
    Thread-1 取出第一个元素为:Thread-2
    Thread-0  wait方法结束
    Thread-0 list为空
    Thread-0 调用wait方法
    

    上面的代码与之前的代码仅仅只是将 wait 外围的 if 语句改为 while 循环即可,这样当 list 为空时,线程便会继续等待,而不会继续去执行删除 list 中元素的代码。

    总结:在使用线程的等待/通知机制时,一般都要在 while 循环中调用 wait()方法,因此配合使用一个 boolean 变量(或其他能判断真假的条件,如本文中的 list.isEmpty()),满足 while 循环的条件时,进入 while 循环,执行 wait()方法,不满足 while 循环的条件时,跳出循环,执行后面的代码。

    假死状态

    现象:如果是多消费者和多生产者情况,如果使用notify方法可能会出现“假死”的情况,即唤醒的是同类线程。

    原因分析:假设当前多个生产者线程会调用wait方法阻塞等待,当其中的生产者线程获取到对象锁之后使用notify通知处于WAITTING状态的线程,如果唤醒的仍然是生产者线程,就会造成所有的生产者线程都处于等待状态。

    解决办法:将notify方法替换成notifyAll方法,如果使用的是lock的话,就将signal方法替换成signalAll方法。

    总结

    在Object提供的消息通知机制应该遵循如下这些条件:

    1. 永远在while循环中对条件进行判断而不是if语句中进行wait条件的判断;
    2. 使用NotifyAll而不是使用notify。

    基本的使用范式如下:

    // The standard idiom for calling the wait method in Java 
    synchronized (sharedObject) { 
        while (condition) { 
        sharedObject.wait(); 
            // (Releases lock, and reacquires on wakeup) 
        } 
        // do action based upon condition e.g. take or put into queue 
    }
    

    wait/notifyAll实现生产者-消费者

    利用wait/notifyAll实现生产者和消费者代码如下:

    public class ProductorConsumerDemo1 {
    
        public static void main(String[] args) {
    
            LinkedList linkedList = new LinkedList();
            ExecutorService service = Executors.newFixedThreadPool(15);
            for (int i = 0; i < 5; i++) {
                service.submit(new Productor(linkedList, 8));
            }
    
            for (int i = 0; i < 10; i++) {
                service.submit(new Consumer(linkedList));
            }
    
        }
    
        static class Productor implements Runnable {
    
            private List<Integer> list;
            private int maxLength;
    
            public Productor(List list, int maxLength) {
                this.list = list;
                this.maxLength = maxLength;
            }
    
            @Override
            public void run() {
                while (true) {
                    synchronized (list) {
                        try {
                            while (list.size() == maxLength) {
                                System.out.println("生产者" + Thread.currentThread().getName() + "  list以达到最大容量,进行wait");
                                list.wait();
                                System.out.println("生产者" + Thread.currentThread().getName() + "  退出wait");
                            }
                            Random random = new Random();
                            int i = random.nextInt();
                            System.out.println("生产者" + Thread.currentThread().getName() + " 生产数据" + i);
                            list.add(i);
                            list.notifyAll();
                            Thread.sleep(500);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
    
                }
            }
        }
    
        static class Consumer implements Runnable {
    
            private List<Integer> list;
    
            public Consumer(List list) {
                this.list = list;
            }
    
            @Override
            public void run() {
                while (true) {
                    synchronized (list) {
                        try {
                            while (list.isEmpty()) {
                                System.out.println("消费者" + Thread.currentThread().getName() + "  list为空,进行wait");
                                list.wait();
                                System.out.println("消费者" + Thread.currentThread().getName() + "  退出wait");
                            }
                            Integer element = list.remove(0);
                            System.out.println("消费者" + Thread.currentThread().getName() + "  消费数据:" + element);
                            list.notifyAll();
                            Thread.sleep(500);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    
    }
    

    输出结果

    生产者pool-1-thread-2 生产数据-703210513
    生产者pool-1-thread-2 生产数据-1025434820
    生产者pool-1-thread-2 生产数据70070412
    生产者pool-1-thread-2 生产数据-598504371
    生产者pool-1-thread-2 生产数据-716978999
    生产者pool-1-thread-2 生产数据-1175198461
    生产者pool-1-thread-2 生产数据-1212912406
    生产者pool-1-thread-2 生产数据-332467186
    生产者pool-1-thread-2  list以达到最大容量,进行wait
    消费者pool-1-thread-15  消费数据:-703210513
    消费者pool-1-thread-15  消费数据:-1025434820
    消费者pool-1-thread-15  消费数据:70070412
    消费者pool-1-thread-15  消费数据:-598504371
    消费者pool-1-thread-15  消费数据:-716978999
    消费者pool-1-thread-15  消费数据:-1175198461
    消费者pool-1-thread-15  消费数据:-1212912406
    消费者pool-1-thread-15  消费数据:-332467186
    消费者pool-1-thread-15  list为空,进行wait
    消费者pool-1-thread-14  list为空,进行wait
    消费者pool-1-thread-13  list为空,进行wait
    消费者pool-1-thread-11  list为空,进行wait
    消费者pool-1-thread-12  list为空,进行wait
    消费者pool-1-thread-10  list为空,进行wait
    消费者pool-1-thread-9  list为空,进行wait
    消费者pool-1-thread-8  list为空,进行wait
    消费者pool-1-thread-7  list为空,进行wait
    消费者pool-1-thread-6  list为空,进行wait
    生产者pool-1-thread-5 生产数据84590545
    生产者pool-1-thread-5 生产数据-1631754695
    

    使用Lock中Condition的await/signalAll实现生产者-消费者

    参照Object的wait和notify/notifyAll方法,Condition也提供了同样的方法:

    针对wait方法

    void await() throws InterruptedException:当前线程进入等待状态,如果其他线程调用condition的signal或者signalAll方法并且当前线程获取Lock从await方法返回,如果在等待状态中被中断会抛出被中断异常;

    long awaitNanos(long nanosTimeout):当前线程进入等待状态直到被通知,中断或者超时;

    boolean await(long time, TimeUnit unit)throws InterruptedException:同第二种,支持自定义时间单位

    boolean awaitUntil(Date deadline) throws InterruptedException:当前线程进入等待状态直到被通知,中断或者到了某个时间

    针对notify方法

    void signal():唤醒一个等待在condition上的线程,将该线程从等待队列中转移到同步队列中,如果在同步队列中能够竞争到Lock则可以从等待方法中返回。

    void signalAll():与signal的区别在于能够唤醒所有等待在condition上的线程

    也就是说wait—>await,notify---->Signal。另外,关于lock中condition消息通知的原理解析可以看这篇文章。

    如果采用lock中Conditon的消息通知原理来实现生产者-消费者问题,原理同使用wait/notifyAll一样。直接上代码:

    public class ProductorConsumerDemo2 {
    
        private static ReentrantLock lock = new ReentrantLock();
        private static Condition full = lock.newCondition();
        private static Condition empty = lock.newCondition();
    
        public static void main(String[] args) {
            LinkedList linkedList = new LinkedList();
            ExecutorService service = Executors.newFixedThreadPool(15);
            for (int i = 0; i < 5; i++) {
                service.submit(new Productor(linkedList, 8, lock));
            }
            for (int i = 0; i < 10; i++) {
                service.submit(new Consumer(linkedList, lock));
            }
    
        }
    
        static class Productor implements Runnable {
    
            private List<Integer> list;
            private int maxLength;
            private Lock lock;
    
            public Productor(List list, int maxLength, Lock lock) {
                this.list = list;
                this.maxLength = maxLength;
                this.lock = lock;
            }
    
            @Override
            public void run() {
                while (true) {
                    lock.lock();
                    try {
                        while (list.size() == maxLength) {
                            System.out.println("生产者" + Thread.currentThread().getName() + "  list以达到最大容量,进行wait");
                            full.await();
                            System.out.println("生产者" + Thread.currentThread().getName() + "  退出wait");
                        }
                        Random random = new Random();
                        int i = random.nextInt();
                        System.out.println("生产者" + Thread.currentThread().getName() + " 生产数据" + i);
                        list.add(i);
                        empty.signalAll();
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        lock.unlock();
                    }
                }
            }
        }
    
        static class Consumer implements Runnable {
    
            private List<Integer> list;
            private Lock lock;
    
            public Consumer(List list, Lock lock) {
                this.list = list;
                this.lock = lock;
            }
    
            @Override
            public void run() {
                while (true) {
                    lock.lock();
                    try {
                        while (list.isEmpty()) {
                            System.out.println("消费者" + Thread.currentThread().getName() + "  list为空,进行wait");
                            empty.await();
                            System.out.println("消费者" + Thread.currentThread().getName() + "  退出wait");
                        }
                        Integer element = list.remove(0);
                        System.out.println("消费者" + Thread.currentThread().getName() + "  消费数据:" + element);
                        full.signalAll();
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        lock.unlock();
                    }
                }
            }
        }
    
    }
    

    输出结果

    生产者pool-1-thread-1 生产数据-1633842993
    生产者pool-1-thread-1 生产数据1337251950
    生产者pool-1-thread-1 生产数据1310879631
    生产者pool-1-thread-1 生产数据-214297115
    生产者pool-1-thread-1 生产数据738937512
    生产者pool-1-thread-1 生产数据13060041
    生产者pool-1-thread-1 生产数据-957049554
    生产者pool-1-thread-1 生产数据-1062017880
    生产者pool-1-thread-1  list以达到最大容量,进行wait
    生产者pool-1-thread-2  list以达到最大容量,进行wait
    生产者pool-1-thread-3  list以达到最大容量,进行wait
    生产者pool-1-thread-4  list以达到最大容量,进行wait
    生产者pool-1-thread-5  list以达到最大容量,进行wait
    消费者pool-1-thread-6  消费数据:-1633842993
    消费者pool-1-thread-6  消费数据:1337251950
    消费者pool-1-thread-6  消费数据:1310879631
    消费者pool-1-thread-6  消费数据:-214297115
    消费者pool-1-thread-6  消费数据:738937512
    消费者pool-1-thread-6  消费数据:13060041
    消费者pool-1-thread-6  消费数据:-957049554
    消费者pool-1-thread-6  消费数据:-1062017880
    消费者pool-1-thread-6  list为空,进行wait
    消费者pool-1-thread-7  list为空,进行wait
    消费者pool-1-thread-8  list为空,进行wait
    消费者pool-1-thread-9  list为空,进行wait
    消费者pool-1-thread-10  list为空,进行wait
    消费者pool-1-thread-11  list为空,进行wait
    消费者pool-1-thread-12  list为空,进行wait
    消费者pool-1-thread-13  list为空,进行wait
    消费者pool-1-thread-14  list为空,进行wait
    消费者pool-1-thread-15  list为空,进行wait
    生产者pool-1-thread-1  退出wait
    生产者pool-1-thread-1 生产数据1949864858
    生产者pool-1-thread-1 生产数据-1693880970
    

    使用BlockingQueue实现生产者-消费者

    由于BlockingQueue内部实现就附加了两个阻塞操作。即当队列已满时,阻塞向队列中插入数据的线程,直至队列中未满;当队列为空时,阻塞从队列中获取数据的线程,直至队列非空时为止。关于BlockingQueue更多细节可以看这篇文章。可以利用BlockingQueue实现生产者-消费者为题,阻塞队列完全可以充当共享数据区域,就可以很好的完成生产者和消费者线程之间的协作。

    public class ProductorConsumerDmoe3 {
    
        private static LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
    
        public static void main(String[] args) {
            ExecutorService service = Executors.newFixedThreadPool(15);
            for (int i = 0; i < 5; i++) {
                service.submit(new Productor(queue));
            }
            for (int i = 0; i < 10; i++) {
                service.submit(new Consumer(queue));
            }
        }
    
        static class Productor implements Runnable {
            private BlockingQueue queue;
    
            public Productor(BlockingQueue queue) {
                this.queue = queue;
            }
    
            @Override
            public void run() {
                try {
                    while (true) {
                        Random random = new Random();
                        int i = random.nextInt();
                        System.out.println("生产者" + Thread.currentThread().getName() + "生产数据" + i);
                        queue.put(i);
                        Thread.sleep(1000);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        static class Consumer implements Runnable {
            private BlockingQueue queue;
    
            public Consumer(BlockingQueue queue) {
                this.queue = queue;
            }
    
            @Override
            public void run() {
                try {
                    while (true) {
                        Integer element = (Integer) queue.take();
                        System.out.println("消费者" + Thread.currentThread().getName() + "正在消费数据" + element);
                        Thread.sleep(1000);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
    }
    

    输出结果

    生产者pool-1-thread-2生产数据-1056722868
    生产者pool-1-thread-1生产数据-1217947426
    生产者pool-1-thread-3生产数据590686437
    生产者pool-1-thread-4生产数据1782376429
    生产者pool-1-thread-5生产数据1558897279
    消费者pool-1-thread-6正在消费数据-1056722868
    消费者pool-1-thread-7正在消费数据-1217947426
    消费者pool-1-thread-8正在消费数据590686437
    消费者pool-1-thread-9正在消费数据1782376429
    消费者pool-1-thread-10正在消费数据1558897279
    生产者pool-1-thread-4生产数据1977644261
    生产者pool-1-thread-3生产数据182370155
    消费者pool-1-thread-11正在消费数据1977644261
    生产者pool-1-thread-2生产数据949821636
    生产者pool-1-thread-5生产数据1931032717
    消费者pool-1-thread-13正在消费数据949821636
    生产者pool-1-thread-1生产数据873417555
    消费者pool-1-thread-14正在消费数据1931032717
    消费者pool-1-thread-12正在消费数据182370155
    消费者pool-1-thread-15正在消费数据873417555
    

    可以看出,使用BlockingQueue来实现生产者-消费者很简洁,这正是利用了BlockingQueue插入和获取数据附加阻塞操作的特性。

    关于生产者-消费者实现的三中方式,到这里就全部总结出来,如果觉得不错的话,请点赞,也算是给我的鼓励,在此表示感谢!

    展开全文
  • C++ 生产者消费者模式的简单实现

    千次阅读 2019-08-20 11:47:03
    这么做可以对生产者与消费者进行解耦,这样一来消费者不直接调用生产者,使得生产者的不会因为生产者的具体处理而阻塞,充分利用资源。 02 思路介绍 代码中的具体元素为Stone类,生产者线程负责向缓冲类Busket中...

    01 模式简介

    注意,生产者消费者模式并不是23中设计模式之一。

    生产者消费者模式可以理解为在生产者和消费者之间添加一个缓冲区,生产者只负责向缓冲区添加元素,而消费者只负责从缓冲区提取元素并使用。

    这么做可以对生产者与消费者进行解耦,这样一来消费者不直接调用生产者,使得生产者的不会因为生产者的具体处理而阻塞,充分利用资源。

    02 思路介绍

    代码中的具体元素为Stone类,生产者线程负责向缓冲类Busket中添加具体元素,而消费者线程则在缓冲类中的队列不为空的时候取用这些元素。

    其中缓冲类需要带一把互斥锁,使得不论是被添加还是被取用的动作都能保证是原子性的。

    用到了boost库的thread_group,如果不了解可以自行创建线程进行调试。

    03 结果截取

    NUM IN BUSKET NOW IS : 20
    THREAD 0x700008668000 GET A STONE WEIGHT IS 75.

    NUM IN BUSKET NOW IS : 19
    THREAD 0x7000085e5000 GET A STONE WEIGHT IS 35.

    NUM IN BUSKET NOW IS : 18
    THREAD 0x700008562000 GET A STONE WEIGHT IS 98.

    NUM IN BUSKET NOW IS : 17
    THREAD 0x7000086eb000 GET A STONE WEIGHT IS 42.

    NUM IN BUSKET NOW IS : 16
    THREAD 0x7000084df000 GET A STONE WEIGHT IS 99.

    NUM IN BUSKET NOW IS : 15
    THREAD 0x7000082d3000 GET A STONE WEIGHT IS 68.

    04 Show me the code

    #include <iostream>
    #include <mutex>
    #include <chrono>
    #include <thread>
    #include <boost/thread/thread.hpp>
    
    using namespace std;
    
    // 用于防止多个线程抢占终端输出导致输出混乱的锁
    mutex log_mt;
    
    // 具体的元素
    class Stone {
    public:
        float weight;
    
        ~Stone() {}
    
        Stone() {
            weight = rand() % 100;
        }
    };
    
    
    // 缓冲类
    class Busket {
    private:
        deque<Stone> dq;
    public:
        mutex mt;
      
        bool empty() const {
            return dq.empty();
        }
    
        const Stone get() {
            Stone temp = *(dq.begin());
            dq.erase(dq.begin());
            return temp;
        }
    
        void add() {
            dq.emplace_back(Stone());
        }
    
        const unsigned int size() {
            return dq.size();
        }
    };
    
    
    // 生产者类
    class Provider {
    private:
        int aim_num;
        Busket* pbusket;
      
    public:
        ~Provider() {}
    
        Provider(const int num, Busket& busket)
        : aim_num(num), pbusket(&busket) {}
    
        void build_stone() {
            const int NUM_TO_BUILD = rand() % 20;
            int ct = 0;
    
            while (pbusket->size() < aim_num && ct < NUM_TO_BUILD) {
                ++ct;
                lock_guard<mutex> lg(pbusket->mt);
                pbusket->add();
            }
        }
    };
    
    
    // 消费者类
    class Custmer {
    private:
        Busket* pbusket;
      
    public:
        ~Custmer() {
            pbusket = nullptr;
        }
    
        Custmer(Busket& busket)
        : pbusket(&busket) {}
    
        const Stone get() const {
            lock_guard<mutex> lg(pbusket->mt);
            cout << "NUM IN BUSKET NOW IS : " << pbusket->size() << endl;
            return pbusket->get();
        }
    
        bool can_take() const {
            return !pbusket->empty();
        }
    };
    
    
    // 生产者子函数
    void func_provider(Provider& provider) {
        while (1) {
            this_thread::sleep_for(chrono::seconds(1));
            provider.build_stone();
        }
    }
    
    
    // 消费者子函数
    void func_custmer(Custmer& custmer) {
        while(1) {
            this_thread::sleep_for(chrono::seconds(1));
            lock_guard<mutex> lg(log_mt);
            if (custmer.can_take()) {
                int temp = custmer.get().weight;
                cout << "THREAD " << this_thread::get_id() << " GET A STONE WEIGHT IS " << temp << "." << endl << endl;
            }
        }
    }
    
    
    int main() {
        // 缓冲区
        unique_ptr<Busket> pbusket(new Busket());
    
        // 生产者
        unique_ptr<Provider> pprovider(new Provider(200, *pbusket));
    
        // 消费者
        unique_ptr<Custmer> pcustmer(new Custmer(*pbusket));
    
    
        // boost线程池
        boost::thread_group threads;
    
        // 生产者
        threads.create_thread(boost::bind(func_provider, *pprovider));
    
        // 创建10个消费者
        for (int i = 0; i < 10; ++i) {
            threads.create_thread(boost::bind(func_custmer, *pcustmer));
        }
    
        // main函数阻塞
        threads.join_all();
    
    
        return 0;
    }
    
    展开全文
  • 消息队列:生产者/消费者模式

    万次阅读 2019-02-28 15:34:10
    生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,...

    1.什么是生产者消费者模式

           生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

           这个阻塞队列就是用来给生产者和消费者解耦的。纵观大多数设计模式,都会找一个第三者出来进行解耦,如工厂模式的第三者是工厂类,模板模式的第三者是模板类。在学习一些设计模式的过程中,如果先找到这个模式的第三者,能帮助我们快速熟悉一个设计模式。

    2.生产消费者模型

           生产者消费者模型具体来讲,就是在一个系统中,存在生产者和消费者两种角色,他们通过内存缓冲区进行通信,生产者生产消费者需要的资料,消费者把资料做成产品。生产消费者模式如下图。

           在日益发展的服务类型中,譬如注册用户这种服务,它可能解耦成好几种独立的服务(账号验证,邮箱验证码,手机短信码等)。它们作为消费者,等待用户输入数据,在前台数据提交之后会经过分解并发送到各个服务所在的url,分发的那个角色就相当于生产者。消费者在获取数据时候有可能一次不能处理完,那么它们各自有一个请求队列,那就是内存缓冲区了。做这项工作的框架叫做消息队列。

    3.生产者消费者模型的实现

      生产者是一堆线程,消费者是另一堆线程,内存缓冲区可以使用List数组队列,数据类型只需要定义一个简单的类就好。关键是如何处理多线程之间的协作。这其实也是多线程通信的一个范例。

      在这个模型中,最关键就是内存缓冲区为空的时候消费者必须等待,而内存缓冲区满的时候,生产者必须等待。其他时候可以是个动态平衡。值得注意的是多线程对临界区资源的操作时候必须保证在读写中只能存在一个线程,所以需要设计锁的策略。

    4.为什么要使用生产者和消费者模式

           在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这种生产消费能力不均衡的问题,所以便有了生产者和消费者模式。

    为了不至于太抽象,我们举一个寄信的例子(虽说这年头寄信已经不时兴,但这个例子还是比较贴切的)。假设你要寄一封平信,大致过程如下:

        1、你把信写好——相当于生产者制造数据

        2、你把信放入邮筒——相当于生产者把数据放入缓冲区

        3、邮递员把信从邮筒取出——相当于消费者把数据取出缓冲区

        4、邮递员把信拿去邮局做相应的处理——相当于消费者处理数据

    4.1优点

    • 解耦

        假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化,可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。

           接着上述的例子,如果不使用邮筒(也就是缓冲区),你必须得把信直接交给邮递员。有同学会说,直接给邮递员不是挺简单的嘛?其实不简单,你必须得认识谁是邮递员,才能把信给他(光凭身上穿的制服,万一有人假冒,就惨了)。这就产生和你和邮递员之间的依赖(相当于生产者和消费者的强耦合)。万一哪天邮递员换人了,你还要重新认识一下(相当于消费者变化导致修改生产者代码)。而邮筒相对来说比较固定,你依赖它的成本就比较低(相当于和缓冲区之间的弱耦合)。

    • 支持并发(concurrency)

           生产者直接调用消费者的某个方法,还有另一个弊端。由于函数调用是同步的(或者叫阻塞的),在消费者的方法没有返回之前,生产者只好一直等在那边。万一消费者处理数据很慢,生产者就会白白糟蹋大好时光。

           使用了生产者/消费者模式之后,生产者和消费者可以是两个独立的并发主体(常见并发类型有进程和线程两种。生产者把制造出来的数据往缓冲区一丢,就可以再去生产下一个数据。基本上不用依赖消费者的处理速度。

    • 支持忙闲不均

           缓冲区还有另一个好处。如果制造数据的速度时快时慢,缓冲区的好处就体现出来了。当数据制造快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。等生产者的制造速度慢下来,消费者再慢慢处理掉。

           为了充分复用,我们再拿寄信的例子来说事。假设邮递员一次只能带走1000封信。万一某次碰上情人节(也可能是圣诞节)送贺卡,需要寄出去的信超过1000封,这时候邮筒这个缓冲区就派上用场了。邮递员把来不及带走的信暂存在邮筒中,等下次过来时再拿走。

    5.多生产者和多消费者场景

            在多核时代,多线程并发处理速度比单线程处理速度更快,所以我们可以使用多个线程来生产数据,同样可以使用多个消费线程来消费数据。而更复杂的情况是,消费者消费的数据,有可能需要继续处理,于是消费者处理完数据之后,它又要作为生产者把数据放在新的队列里,交给其他消费者继续处理。如下图:

    ç产èæ¶è´¹è模å¼

    6.线程池与生产消费者模式

           Java中的线程池类其实就是一种生产者和消费者模式的实现方式,但是我觉得其实现方式更加高明。生产者把任务丢给线程池,线程池创建线程并处理任务,如果将要运行的任务数大于线程池的基本线程数就把任务扔到阻塞队列里,这种做法比只使用一个阻塞队列来实现生产者和消费者模式显然要高明很多,因为消费者能够处理直接就处理掉了,这样速度更快,而生产者先存,消费者再取这种方式显然慢一些。

           我们的系统也可以使用线程池来实现多生产者消费者模式。比如创建N个不同规模的Java线程池来处理不同性质的任务,比如线程池1将数据读到内存之后,交给线程池2里的线程继续处理压缩数据。线程池1主要处理IO密集型任务,线程池2主要处理CPU密集型任务。

    7.内存缓冲区

    最传统、最常见的方式:队列(FIFO)作缓冲。

    7.1 线程方式

    并发线程中使用队列的优缺点

    • 内存分配的性能

           在线程方式下,生产者和消费者各自是一个线程。生产者把数据写入队列头(以下简称push),消费者从队列尾部读出数据(以下简称pop)。当队列为空,消费者就稍息(稍事休息);当队列满(达到最大长度),生产者就稍息。整个流程并不复杂。

           上述过程会有一个主要的问题是关于内存分配的性能开销。对于常见的队列实现:在每次push时,可能涉及到堆内存的分配;在每次pop时,可能涉及堆内存的释放。假如生产者和消费者都很勤快,频繁地push、pop,那内存分配的开销就很可观了。对于内存分配的开销,可查找Java性能优化相关知识。

            解决办法:环形缓冲区

    • 同步和互斥的性能

           另外,由于两个线程共用一个队列,自然就会涉及到线程间诸如同步、互斥、死锁等等。这会儿要细谈的是,同步和互斥的性能开销。在很多场合中,诸如信号量、互斥量等的使用也是有不小的开销的(某些情况下,也可能导致用户态/核心态切换)。如果像刚才所说,生产者和消费者都很勤快,那这些开销也不容小觑。

            解决办法:双缓冲区

    • 适用于队列的场合

           由于队列是很常见的数据结构,大部分编程语言都内置了队列的支持,有些语言甚至提供了线程安全的队列(比如JDK 1.5引入的ArrayBlockingQueue)。因此,开发人员可以捡现成,避免了重新发明轮子。

           所以,假如你的数据流量不是很大,采用队列缓冲区的好处还是很明显的:逻辑清晰、代码简单、维护方便。比较符合KISS原则。

    7.2 进程方式

           跨进程的生产者/消费者模式,非常依赖于具体的进程间通讯(IPC)方式。而IPC的种类很多。下面介绍比较常用的跨平台、且编程语言支持较多的IPC方式。

    • 匿名管道

           感觉管道是最像队列的IPC类型。生产者进程在管道的写端放入数据;消费者进程在管道的读端取出数据。整个的效果和线程中使用队列非常类似,区别在于使用管道就无需操心线程安全、内存分配等琐事(操作系统暗中都帮你搞定了)。

           管道又分命名管道匿名管道两种,今天主要聊匿名管道。因为命名管道在不同的操作系统下差异较大(比如Win32和POSIX,在命名管道的API接口和功能实现上都有较大差异;有些平台不支持命名管道,比如Windows CE)。除了操作系统的问题,对于有些编程语言(比如Java)来说,命名管道是无法使用的。

           其实匿名管道在不同平台上的API接口,也是有差异的(比如Win32的CreatePipe和POSIX的pipe,用法就很不一样)。但是我们可以仅使用标准输入标准输出(以下简称stdio)来进行数据的流入流出。然后利用shell的管道符把生产者进程和消费者进程关联起来。实际上,很多操作系统(尤其是POSIX风格的)自带的命令都充分利用了这个特性来实现数据的传输(比如more、grep等),如此优点

           1、基本上所有操作系统都支持在shell方式下使用管道符。因此很容易实现跨平台。

           2、大部分编程语言都能够操作stdio,因此跨编程语言也就容易实现。

           3、管道方式省却了线程安全方面的琐事。有利于降低开发、调试成本。

           当然,这种方式也有自身的缺点

           1、生产者进程和消费者进程必须得在同一台主机上,无法跨机器通讯。这个缺点比较明显。

           2、在一对一的情况下,这种方式挺合用。但如果要扩展到一对多或者多对一,那就有点棘手了。所以这种方式的扩展性要打个折扣。假如今后要考虑类似的扩展,这个缺点就比较明显。

            3、由于管道是shell创建的,对于两边的进程不可见(程序看到的只是stdio)。在某些情况下,导致程序不便于对管道进行操纵(比如调整管道缓冲区尺寸)。这个缺点不太明显。

            4、最后,这种方式只能单向传数据。好在大多数情况下,消费者进程不需要传数据给生产者进程。万一你确实需要信息反馈(从消费者到生产者),那就费劲了。可能得考虑换种IPC方式。

           注意事项:

           1、对stdio进行读写操作是以阻塞方式进行。比如管道中没有数据,消费者进程的读操作就会一直停在哪儿,直到管道中重新有数据。

           2、由于stdio内部带有自己的缓冲区(这缓冲区和管道缓冲区是两码事),有时会导致一些不太爽的现象(比如生产者进程输出了数据,但消费者进程没有立即读到)。

    • SOCKET(TCP方式)

           基于TCP方式的SOCKET通讯是又一个类似于队列的IPC方式。它同样保证了数据的顺序到达;同样有缓冲的机制。而且跨平台和跨语言,和刚才介绍的shell管道符方式类似。

            SOCKET相比shell管道符的方式,主要有如下几个优点:

        1、SOCKET方式可以跨机器(便于实现分布式)。这是主要优点。

        2、SOCKET方式便于将来扩展成为多对一或者一对多。这也是主要优点。

        3、SOCKET可以设置阻塞和非阻塞方法,用起来比较灵活。这是次要优点。

        4、SOCKET支持双向通讯,有利于消费者反馈信息。

           当然有利就有弊。相对于上述shell管道的方式,使用SOCKET在编程上会更复杂一些。好在前人已经做了大量的工作,可借助于这些第三方的库和框架,比如C++的ACE库、Python的Twisted。

           虽然TCP在很多方面比UDP可靠,但鉴于跨机器通讯先天的不可预料性,可以在生产者进程和消费者进程内部各自再引入基于线程的"生产者/消费者模式",如下图:

    这么做的关键点在于把代码分为两部分:生产线程和消费线程属于和业务逻辑相关的代码(和通讯逻辑无关);发送线程和接收线程属于通讯相关的代码(和业务逻辑无关)。

        这样的好处是很明显的,具体如下:

        1、能够应对暂时性的网络故障。并且在网络故障解除后,能够继续工作。

        2、网络故障的应对处理方式(比如断开后的尝试重连),只影响发送和接收线程,不会影响生产线程和消费线程(业务逻辑部分)。

        3、具体的SOCKET方式(阻塞和非阻塞)只影响发送和接收线程,不影响生产线程和消费线程(业务逻辑部分)。

        4、不依赖TCP自身的发送缓冲区和接收缓冲区。(默认的TCP缓冲区的大小可能无法满足实际要求)

        5、业务逻辑的变化(比如业务需求变更)不影响发送线程和接收线程。

        针对上述的最后一条,如果整个业务系统中有多个进程是采用上述的模式,那或许可以重构:在业务逻辑代码和通讯逻辑代码之间,把业务逻辑无关的部分封装成一个通讯中间件。

    7.3 环形缓冲区

    使用场景:当存储空间(不仅包括内存,还可能包括诸如硬盘之类的存储介质)的分配/释放非常频繁并且确实产生了明显的影响,才应该考虑环形缓冲区的使用。否则的话,还是选用最基本、最简单的队列缓冲区。

    • 环形缓冲区 vs 队列缓冲区

        1.外部接口相似

        普通的队列有一个写入端和一个读出端。队列为空的时候,读出端无法读取数据;当队列满(达到最大尺寸)时,写入端无法写入数据。

        对于使用者来讲,环形缓冲区和队列缓冲区是一样的。它也有一个写入端(用于push)和一个读出端(用于pop),也有缓冲区“满”和“空”的状态。所以,从队列缓冲区切换到环形缓冲区,对于使用者来说能比较平滑地过渡。

        2.内部结构迥异

        虽然两者的对外接口差不多,但是内部结构和运作机制有很大差别。重点介绍一下环形缓冲区的内部结构。

        可以把环形缓冲区的读出端(以下简称R)和写入端(以下简称W)想象成是两个人在体育场跑道上追逐(R追W)。当R追上W的时候,就是缓冲区为空;当W追上R的时候(W比R多跑一圈),就是缓冲区满。

        为了形象起见,如下:

     从上图可以看出,环形缓冲区所有的push和pop操作都是在一个固定的存储空间内进行。而队列缓冲区在push的时候,可能会分配存储空间用于存储新元素;在pop时,可能会释放废弃元素的存储空间。所以环形方式相比队列方式,少掉了对于缓冲区元素所用存储空间的分配、释放。这是环形缓冲区的一个主要优势。

    • 环形缓冲区的实现

           1.数组方式 vs 链表方式

           环形缓冲区的内部实现,即可基于数组(此处的数组,泛指连续存储空间)实现,也可基于链表实现。

           数组在物理存储上是一维的连续线性结构,可以在初始化时,把存储空间一次性分配好,这是数组方式的优点。但是要使用数组来模拟环,你必须在逻辑上把数组的头和尾相连。在顺序遍历数组时,对尾部元素(最后一个元素)要作一下特殊处理。访问尾部元素的下一个元素时,要重新回到头部元素(第0个元素)。如下图所示:

            使用链表的方式,正好和数组相反:链表省去了头尾相连的特殊处理。但是链表在初始化的时候比较繁琐,而且在有些场合(比如跨进程的IPC)不太方便使用。

           2.读写操作

           环形缓冲区要维护两个索引,分别对应写入端(W)和读取端(R)。写入(push)的时候,先确保环没满,然后把数据复制到W所对应的元素,最后W指向下一个元素;读取(pop)的时候,先确保环没空,然后返回R对应的元素,最后R指向下一个元素。

           3.判断“空”和“满”

           上述的操作并不复杂,不过有一个小小的麻烦:空环和满环的时候,R和W都指向同一个位置!这样就无法判断到底是“空”还是“满”。大体上有两种方法可以解决该问题。

           办法1:始终保持一个元素不用

           当空环的时候,R和W重叠。当W比R跑得快,追到距离R还有一个元素间隔的时候,就认为环已经满。当环内元素占用的存储空间较大的时候,这种办法显得很土(浪费空间)。

           办法2:维护额外变量

           如果不喜欢上述办法,还可以采用额外的变量来解决。比如可以用一个整数记录当前环中已经保存的元素个数(该整数>=0)。当R和W重叠的时候,通过该变量就可以知道是“空”还是“满”。

          4.元素的存储

           由于环形缓冲区本身就是要降低存储空间分配的开销,因此缓冲区中元素的类型要选好。尽量存储值类型的数据,而不要存储指针(引用)类型的数据。因为指针类型的数据又会引起存储空间(比如堆内存)的分配和释放,使得环形缓冲区的效果打折扣。

    • 应用场合

           如果所使用的编程语言和开发库中带有现成的、成熟的环形缓冲区,建议使用现成的库,不要重新制造轮子;确实找不到现成的,才考虑自己实现。

          1.用于并发线程

           和线程中的队列缓冲区类似,线程中的环形缓冲区也要考虑线程安全的问题。除非使用的环形缓冲区的库已经实现了线程安全,否则还是得自己动手搞定。线程方式下的环形缓冲区用得比较多,相关的网上资料也多,下面就大致介绍几个。

           对于C++的程序员,强烈推荐使用boost提供的circular_buffer模板,该模板最开始是在boost 1.35版本中引入的。鉴于boost在C++社区中的地位,大伙儿应该可以放心使用该模板。

           对于C程序员,可以去看看开源项目circbuf,不过该项目是GPL协议的,不太爽;而且活跃度不太高;而且只有一个开发人员。大伙儿慎用!建议只拿它当参考。

           对于C#程序员,可以参考CodeProject上的一个示例。

           2.用于并发进程

           进程间的环形缓冲区,似乎少有现成的库可用。

           适用于进程间环形缓冲的IPC类型,常见的有共享内存和文件。在这两种方式上进行环形缓冲,通常都采用数组的方式实现。程序事先分配好一个固定长度的存储空间,然后具体的读写操作、判断“空”和“满”、元素存储等细节就可参照前面所说的来进行。

           共享内存方式的性能很好,适用于数据流量很大的场景。但是有些语言(比如Java)对于共享内存不支持。因此,该方式在多语言协同开发的系统中,会有一定的局限性。

            而文件方式在编程语言方面支持很好,几乎所有编程语言都支持操作文件。但它可能会受限于磁盘读写(Disk I/O)的性能。所以文件方式不太适合于快速数据传输;但是对于某些“数据单元”很大的场合,文件方式是值得考虑的。

            对于进程间的环形缓冲区,同样要考虑好进程间的同步、互斥等问题。

    8.生产者消费者模式三种实现方式代码示例

    8.1 synchronized、wait和notify  

    package producerConsumer;
    //wait 和 notify
    public class ProducerConsumerWithWaitNofity {
        public static void main(String[] args) {
            Resource resource = new Resource();
            //生产者线程
            ProducerThread p1 = new ProducerThread(resource);
            ProducerThread p2 = new ProducerThread(resource);
            ProducerThread p3 = new ProducerThread(resource);
            //消费者线程
            ConsumerThread c1 = new ConsumerThread(resource);
            //ConsumerThread c2 = new ConsumerThread(resource);
            //ConsumerThread c3 = new ConsumerThread(resource);
        
            p1.start();
            p2.start();
            p3.start();
            c1.start();
            //c2.start();
            //c3.start();
        }
        
        
        
    }
    /**
     * 公共资源类
     * @author 
     *
     */
    class Resource{//重要
        //当前资源数量
        private int num = 0;
        //资源池中允许存放的资源数目
        private int size = 10;
    
        /**
         * 从资源池中取走资源
         */
        public synchronized void remove(){
            if(num > 0){
                num--;
                System.out.println("消费者" + Thread.currentThread().getName() +
                        "消耗一件资源," + "当前线程池有" + num + "个");
                notifyAll();//通知生产者生产资源
            }else{
                try {
                    //如果没有资源,则消费者进入等待状态
                    wait();
                    System.out.println("消费者" + Thread.currentThread().getName() + "线程进入等待状态");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        /**
         * 向资源池中添加资源
         */
        public synchronized void add(){
            if(num < size){
                num++;
                System.out.println(Thread.currentThread().getName() + "生产一件资源,当前资源池有" 
                + num + "个");
                //通知等待的消费者
                notifyAll();
            }else{
                //如果当前资源池中有10件资源
                try{
                    wait();//生产者进入等待状态,并释放锁
                    System.out.println(Thread.currentThread().getName()+"线程进入等待");
                }catch(InterruptedException e){
                    e.printStackTrace();
                }
            }
        }
    }
    /**
     * 消费者线程
     */
    class ConsumerThread extends Thread{
        private Resource resource;
        public ConsumerThread(Resource resource){
            this.resource = resource;
        }
        @Override
        public void run() {
            while(true){
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                resource.remove();
            }
        }
    }
    /**
     * 生产者线程
     */
    class ProducerThread extends Thread{
        private Resource resource;
        public ProducerThread(Resource resource){
            this.resource = resource;
        }
        @Override
        public void run() {
            //不断地生产资源
            while(true){
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                resource.add();
            }
        }
        
    }

    8.2 lock和condition的await、signalAll  

    package producerConsumer;
    
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    /**
     * 使用Lock 和 Condition解决生产者消费者问题
     * @author tangzhijing
     *
     */
    public class LockCondition {
            public static void main(String[] args) {
                Lock lock = new ReentrantLock();
                Condition producerCondition = lock.newCondition();
                Condition consumerCondition = lock.newCondition();
                Resource2 resource = new Resource2(lock,producerCondition,consumerCondition);
                
                //生产者线程
                ProducerThread2 producer1 = new ProducerThread2(resource);
                
                //消费者线程
                ConsumerThread2 consumer1 = new ConsumerThread2(resource);
                ConsumerThread2 consumer2 = new ConsumerThread2(resource);
                ConsumerThread2 consumer3 = new ConsumerThread2(resource);
                
                producer1.start();
                consumer1.start();
                consumer2.start();
                consumer3.start();
            }
    }
    /**
     * 消费者线程
     */
    class ConsumerThread2 extends Thread{
        private Resource2 resource;
        public ConsumerThread2(Resource2 resource){
            this.resource = resource;
            //setName("消费者");
        }
        public void run(){
            while(true){
                try {
                    Thread.sleep((long) (1000 * Math.random()));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                resource.remove();
            }
        }
    }
    /**
     * 生产者线程
     * @author tangzhijing
     *
     */
    class ProducerThread2 extends Thread{
        private Resource2 resource;
        public ProducerThread2(Resource2 resource){
            this.resource = resource;
            setName("生产者");
        }
        public void run(){
            while(true){
                    try {
                        Thread.sleep((long) (1000 * Math.random()));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    resource.add();
            }
        }
    }
    /**
     * 公共资源类
     * @author tangzhijing
     *
     */
    class Resource2{
        private int num = 0;//当前资源数量
        private int size = 10;//资源池中允许存放的资源数目
        private Lock lock;
        private Condition producerCondition;
        private Condition consumerCondition;
        public Resource2(Lock lock, Condition producerCondition, Condition consumerCondition) {
            this.lock = lock;
            this.producerCondition = producerCondition;
            this.consumerCondition = consumerCondition;
     
        }
        /**
         * 向资源池中添加资源
         */
        public void add(){
            lock.lock();
            try{
                if(num < size){
                    num++;
                    System.out.println(Thread.currentThread().getName() + 
                            "生产一件资源,当前资源池有" + num + "个");
                    //唤醒等待的消费者
                    consumerCondition.signalAll();
                }else{
                    //让生产者线程等待
                    try {
                        producerCondition.await();
                        System.out.println(Thread.currentThread().getName() + "线程进入等待");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }finally{
                lock.unlock();
            }
        }
        /**
         * 从资源池中取走资源
         */
        public void remove(){
            lock.lock();
            try{
                if(num > 0){
                    num--;
                    System.out.println("消费者" + Thread.currentThread().getName() 
                            + "消耗一件资源," + "当前资源池有" + num + "个");
                    producerCondition.signalAll();//唤醒等待的生产者
                }else{
                    try {
                        consumerCondition.await();
                        System.out.println(Thread.currentThread().getName() + "线程进入等待");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }//让消费者等待
                }
            }finally{
                lock.unlock();
            }
        }
        
    }

    8.3 lock和condition的await、signalAll

    package producerConsumer;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    
    //使用阻塞队列BlockingQueue解决生产者消费者
    public class BlockingQueueConsumerProducer {
        public static void main(String[] args) {
            Resource3 resource = new Resource3();
            //生产者线程
            ProducerThread3 p = new ProducerThread3(resource);
            //多个消费者
            ConsumerThread3 c1 = new ConsumerThread3(resource);
            ConsumerThread3 c2 = new ConsumerThread3(resource);
            ConsumerThread3 c3 = new ConsumerThread3(resource);
     
            p.start();
            c1.start();
            c2.start();
            c3.start();
        }
    }
    /**
     * 消费者线程
     * @author tangzhijing
     *
     */
    class ConsumerThread3 extends Thread {
        private Resource3 resource3;
     
        public ConsumerThread3(Resource3 resource) {
            this.resource3 = resource;
            //setName("消费者");
        }
     
        public void run() {
            while (true) {
                try {
                    Thread.sleep((long) (1000 * Math.random()));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                resource3.remove();
            }
        }
    }
    /**
     * 生产者线程
     * @author tangzhijing
     *
     */
    class ProducerThread3 extends Thread{
        private Resource3 resource3;
        public ProducerThread3(Resource3 resource) {
            this.resource3 = resource;
            //setName("生产者");
        }
     
        public void run() {
            while (true) {
                try {
                    Thread.sleep((long) (1000 * Math.random()));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                resource3.add();
            }
        }
    }
    class Resource3{
        private BlockingQueue resourceQueue = new LinkedBlockingQueue(10);
        /**
         * 向资源池中添加资源
         */
        public void add(){
            try {
                resourceQueue.put(1);
                System.out.println("生产者" + Thread.currentThread().getName()
                        + "生产一件资源," + "当前资源池有" + resourceQueue.size() + 
                        "个资源");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        /**
         * 向资源池中移除资源
         */
        public void remove(){
            try {
                resourceQueue.take();
                System.out.println("消费者" + Thread.currentThread().getName() + 
                        "消耗一件资源," + "当前资源池有" + resourceQueue.size() 
                        + "个资源");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    参考文章:

    1.https://blog.csdn.net/u011109589/article/details/80519863

    2.https://www.cnblogs.com/chentingk/p/6497107.html

    3.http://ifeve.com/producers-and-consumers-mode/(并发编程网--创始人:方腾飞)

    4.https://www.cnblogs.com/fankongkong/p/7339848.html

    展开全文
  • 三种方式实现生产者-消费者模型

    千次阅读 2019-11-17 09:40:08
    生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的线程——即所谓的“生产者”和...

    前言

    生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

    看完了定义,相信懵逼的依然懵逼,那我就来说人话吧。
    生产者消费者模型需要抓住“三个主体,三个要点“,三个主体是指:生产者消费者缓冲区。生产者往缓冲区放数据,消费者从缓冲区取数据。
    生产者-消费者模型
    整个模型大致就是上面图示的结构。
    三个要点是指:

    • 缓冲区有固定大小
    • 缓冲区满时,生产者不能再往缓冲区放数据(产品),而是被阻塞,直到缓冲区不是满的
    • 缓冲区为空时,消费者不能再从缓冲区取数据,而是被阻塞,直到缓冲区不是空的。

    因为数据(产品)往往是先生产出来的先被消费。所以缓冲区一般用有界队列实现,又由于生产者、消费者在特定情况下需要被阻塞,所以更具体一点,缓冲区一般用有界阻塞队列来实现。
    本篇用三种方式实现生产者-消费者模型:wait/notify + 队列、Lock/Condition + 队列、有界阻塞队列。

    wait/notify + 队列

    实现生产者-消费者模型,主要是实现两个核心方法:往缓冲区中放元素、从缓冲区中取元素。
    以下是缓冲区的代码实现,是生产者-消费者模型的核心。

    import java.util.LinkedList;
    import java.util.Queue;
    
    /**
     * wait/notify机制实现生产者-消费者模型
     */
    public class ProducerConsumerQueue<E> {
        /**
         * 队列最大容量
         */
        private final static int QUEUE_MAX_SIZE = 3;
        /**
         * 存放元素的队列
         */
        private Queue<E> queue;
    
        public ProducerConsumerQueue() {
            queue = new LinkedList<>();
        }
    
        /**
         * 向队列中添加元素
         *
         * @param e
         * @return
         */
        public synchronized boolean put(E e) {
            // 如果队列是已满,则阻塞当前线程
            while (queue.size() == QUEUE_MAX_SIZE) {
                try {
                    wait();
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
    
            // 队列未满,放入元素,并且通知消费线程
            queue.offer(e);
            System.out.println(Thread.currentThread().getName() + " -> 生产元素,元素个数为:" + queue.size());
            notify();
            return true;
        }
    
        /**
         * 从队列中获取元素
         * @return
         */
        public synchronized E get() {
            // 如果队列是空的,则阻塞当前线程
            while (queue.isEmpty()) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            // 队列非空,取出元素,并通知生产者线程
            E e = queue.poll();
            System.out.println(Thread.currentThread().getName() + " -> 消费元素,元素个数为:" + queue.size());
            notify();
            return e;
        }
    }
    

    实现了缓冲区后,对于生产者、消费者线程的实现就比较简单了

    /**
     * 生产者线程
     */
    public class Producer implements Runnable {
    
        private ProducerConsumerQueue<Integer> queue;
    
        public Producer(ProducerConsumerQueue<Integer> queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                queue.put(i);
            }
        }
    }
    
    /**
     * 消费者线程
     */
    public class Consumer implements Runnable {
    
        private ProducerConsumerQueue<Integer> queue;
    
        public Consumer(ProducerConsumerQueue<Integer> queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                queue.get();
            }
        }
    }
    

    测试代码如下:

    import java.util.Random;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class ProducerConsumerDemo {
    
        private final static ExecutorService service = Executors.newCachedThreadPool();
    
        public static void main(String[] args) throws InterruptedException {
    
            Random random = new Random();
    
            // 生产者-消费者模型缓冲区
            ProducerConsumerQueue<Integer> queue = new ProducerConsumerQueue<>();
    
            Producer producer = new Producer(queue);
            Consumer consumer = new Consumer(queue);
    
            for (int i = 0; i < 3; i++) {
                // 休眠0-50毫秒,增加随机性
                Thread.sleep(random.nextInt(50));
                service.submit(producer);
            }
            for (int i = 0; i < 3; i++) {
                // 休眠0-50毫秒,增加随机性
                Thread.sleep(random.nextInt(50));
                service.submit(consumer);
            }
    
            // 关闭线程池
            service.shutdown();
        }
    }
    

    执行结果(由于执行结果比较长,所以截取部分结果)

    pool-1-thread-1 -> 生产元素,元素个数为:1
    pool-1-thread-1 -> 生产元素,元素个数为:2
    pool-1-thread-1 -> 生产元素,元素个数为:3
    pool-1-thread-4 -> 消费元素,元素个数为:2
    pool-1-thread-1 -> 生产元素,元素个数为:3
    pool-1-thread-4 -> 消费元素,元素个数为:2
    pool-1-thread-3 -> 生产元素,元素个数为:3
    pool-1-thread-4 -> 消费元素,元素个数为:2
    pool-1-thread-4 -> 消费元素,元素个数为:1
    pool-1-thread-4 -> 消费元素,元素个数为:0
    pool-1-thread-2 -> 生产元素,元素个数为:1
    pool-1-thread-2 -> 生产元素,元素个数为:2
    pool-1-thread-2 -> 生产元素,元素个数为:3
    pool-1-thread-4 -> 消费元素,元素个数为:2
    ......
    

    虽然是部分结果,但是依然可以看出几点:

    • 由于队列的最大长度是3(QUEUE_MAX_SIZE),所以缓冲区元素不会超过3,说明缓冲区满时,生产者确实被阻塞了
    • 缓冲区元素个数最小为0,不会出现负数,说明缓冲区为空时,消费者被阻塞了

    这就是生产者-消费者模型基于wait/notify+队列的基本实现。

    Lock/Condition + 队列

    同样,核心部分缓冲区的实现代码实现如下:

    import java.util.LinkedList;
    import java.util.Queue;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
     * Lock/Condition实现生产者-消费者模型
     */
    public class ProducerConsumerQueue<E> {
        /**
         * 队列最大容量
         */
        private final static int QUEUE_MAX_SIZE = 3;
        /**
         * 存放元素的队列
         */
        private Queue<E> queue;
    
        private final Lock lock = new ReentrantLock();
        private final Condition producerCondition = lock.newCondition();
        private final Condition consumerCondition = lock.newCondition();
    
        public ProducerConsumerQueue() {
            queue = new LinkedList<>();
        }
    
        /**
         * 向队列中添加元素
         * @param e
         * @return
         */
        public boolean put(E e) {
            final Lock lock = this.lock;
            lock.lock();
            try {
                while (queue.size() == QUEUE_MAX_SIZE) {
                    // 队列已满
                    try {
                        producerCondition.await();
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                }
                queue.offer(e);
                System.out.println(Thread.currentThread().getName() + " -> 生产元素,元素个数为:" + queue.size());
                consumerCondition.signal();
            } finally {
                lock.unlock();
            }
            return true;
        }
    
        /**
         * 从队列中取出元素
         * @return
         */
        public E get() {
            final Lock lock = this.lock;
            lock.lock();
            try {
                while (queue.isEmpty()) {
                    // 队列为空
                    try {
                        consumerCondition.await();
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                }
                E e = queue.poll();
                System.out.println(Thread.currentThread().getName() + " -> 消费元素,元素个数为:" + queue.size());
                producerCondition.signal();
                return e;
            } finally {
                lock.unlock();
            }
        }
    }
    

    可以看到,代码基本和wait/notify实现方式一致,基本只是API的不同而已。生产者线程、消费者线程、测试代码更是和wait/notify方式一致,所以就不赘述了。

    有界阻塞队列

    同样,缓冲区的实现也是其核心部分,不过阻塞队列已经提供了相应的阻塞API,所以不需要额外编写阻塞部分的代码

    /**
     * 阻塞队列实现生产者-消费者模型
     * 对应的阻塞方法是put()/take()
     */
    public class ProducerConsumerQueue<E> {
    
        /**
         * 队列最大容量
         */
        private final static int QUEUE_MAX_SIZE = 3;
        /**
         * 存放元素的队列
         */
        private BlockingQueue<E> queue;
    
        public ProducerConsumerQueue() {
            queue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE);
        }
    
        /**
         * 向队列中添加元素
         * @param e
         * @return
         */
        public boolean put(E e) {
            try {
                queue.put(e);
                System.out.println(Thread.currentThread().getName() + " -> 生产元素,元素个数为:" + queue.size());
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
            return true;
        }
    
        /**
         * 从队列中取出元素
         * @return
         */
        public E get() {
            try {
                E e = queue.take();
                System.out.println(Thread.currentThread().getName() + " -> 消费元素,元素个数为:" + queue.size());
                return e;
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
            return null;
        }
    }
    

    生产者线程、消费者线程、测试代码也和前面两种一模一样。

    总结

    通过三种方式实现生产者-消费者模型,可以看出使用阻塞队列的方式最简单,也更安全。其实看看阻塞队列的源码,会发现其内部的实现和这里的前两种差不多,只是JDK提供的阻塞队列健壮性更好。

    说完了三种实现方式,再来说说为什么要使用生产者-消费者模式,消费者直接调用生产者不好吗?
    回顾文章开始的那张图,试想一下,如果没有生产者-消费者模式会怎样,大概会变成如下这样
    没有生产者-消费者模式
    可以看到,三个生产者,三个消费者就会产生 3 * 3 = 9条调用关系(箭头方法代表数据走向),还有一点就是消费者也有可能还是生产者,生产者也有可能还是消费者,一旦生产者、消费者的数量多了之后就会形成复杂的调用网。所以生产者-消费者模型的最大好处就是解耦
    其次如果生产者和消费者的速度上有较大的差异,就一定会存在一方总是在等待另一方的情况。比如快递小哥如果每一个快递都必须直接送到用户手上,如果某个用户一直联系不上,或者说过了很久才取快递,那么快递小哥就只能一直等待。所以就出现了快递站,快递小哥只需要把快递放在指定位置,用户去指定位置取就行了。所以生产者-消费者模型的第二个好处就是平衡生产能力和消费能力的差异

    以上就是本篇关于生产者-消费者模型的全部内容。

    展开全文
  • Java可视化实现生产者消费者问题

    千次阅读 2019-09-28 22:14:06
    引言:生产者消费者问题是一个十分经典的多线程问题。为了更加形象地描述这个问题,采用可视化的形式展示此过程。
  • 生产者/消费者模式的理解及实现

    万次阅读 多人点赞 2018-05-31 10:05:37
    ★简介 生产者消费者模式并不是GOF提出的23种设计模式之一,23种设计模式都是建立在面向对象的基础之上的,但其实面向过程的编程中也有很多高效的编程模式,生产者消费者模式便是其中之一,它是我们编程过程中最...
  • 生产者消费者模型java实现

    万次阅读 2018-09-12 10:33:45
    做题的时候遇到了生产者消费者问题,这个问题可以说是线程学习的经典题目了,就忍不住研究了一波。它描述是有一块缓冲区(队列实现)作为仓库,生产者可以将产品放入仓库,消费者则可以从仓库中取走产品。在Java中这...
  • 操作系统_生产者消费者问题

    千次阅读 多人点赞 2020-03-17 21:29:53
    1,生产者消费者问题 问题的提出 初步思考 进程资源共享关系和同步关系分析 问题的具体解决 第一搏 存在的问题 第二搏 多维度思考 1,单生产者、单消费者、多缓冲区 2,多生产者、多消费者、单缓冲 3,单...
  • Java实现生产者和消费者模式

    万次阅读 多人点赞 2019-10-26 18:39:07
    生产者、消费者模型会出现的问题出发,谈了一下对生产者、消费者模型的理解,并配有完整的代码实现。
  • Java多种方式解决生产者消费者问题(十分详细)

    万次阅读 多人点赞 2018-08-16 08:40:50
    生产者消费者问题 一、问题描述 生产者消费者问题(Producer-consumer problem),也称有限缓冲问题(Bounded-buffer problem),是一个多线程同步问题的经典案例。生产者生成一定量的数据放到缓冲区中,...
  • Java实现Kafka生产者和消费者的示例

    万次阅读 多人点赞 2021-01-05 10:06:08
    Java实现Kafka生产者和消费者的示例
  • 经典的进程同步问题-----生产者-消费者问题详解

    千次阅读 多人点赞 2019-11-25 08:32:47
    经典的进程同步问题-----生产者-消费者问题详解 ​ 本文和接下来几篇博文是对上篇文章(进程同步机制)的一次实践,通过具体的例子来加深理论的理解,会用三个经典的进程同步问题来进行讲解,并且会配有伪代码和...
  • Java线程实现生产者消费者模式

    千次阅读 2018-06-11 20:24:46
    1 什么是生产者消费者模式想一个现实生活中的例子,啤酒商---超市---消费者也就是我们,啤酒商生产了啤酒,然后将啤酒销售给了超市,我们消费之又会到超市将啤酒买回来自己喝,那么啤酒商和消费者之间是什么关系呢?...
  • C语言——生产者消费者问题

    万次阅读 2020-05-06 16:01:28
    百度文献查看原文 核心代码: #include #include #include #include #include #define dataBufferSize 2 //缓冲区数目 #define processNum 4 //进程数量(生产者、消费者进程总数目) typedef struct Seamphore //信号...
  • Java——生产者-消费者问题(GUI)

    万次阅读 多人点赞 2020-05-01 19:02:43
    设计一个模拟仿真“生产者-消费者”问题的解决过程及方法的程序
  • RabbitMQ:消费者和生产者

    千次阅读 2019-06-12 15:37:08
    生产者(producer)创建消息,然后发布(发送)到代理服务器(RabbitMQ)。什么是消息呢?消息包含两部分内容:有效载荷(payload)和标签(label)。有效载荷就是你想要传输的数据。他可以是任何内容,一个JSON数组...
  • 关于生产者消费者模式的C#实现

    千次阅读 2018-12-25 09:12:39
    记录一下用C#实现生产者消费者模式吧。 先介绍一下这个模式,简而言之就是生产者(可能有数个)生产东西,消费者(可能有数个)消费前面生产的东西。举个生活中的例子就是苹果有好几个厂家(生产者)生产iphone,线...
  •   以生产者和消费者问题为例,学习并熟悉Linux下进程通信、同步机制的具体实现方法,主要是了解并掌握信号量机制和共享内存的使用方法,进一步熟悉Linux系统的相关指令的调用。 【实验内容】   使用共享内存和...
  • 生产者消费者问题-代码详解(Java多线程)

    千次阅读 多人点赞 2020-06-13 11:10:07
    文章目录一、生产者消费者问题二、代码实现三、拓展知识 一、生产者消费者问题 生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的...
  • python3实现生产者消费者模型

    千次阅读 2019-02-03 13:48:27
    生产者消费者模型 一个模型,可以用来搭建消息队列。 主要用于解决·两个线程之间速度不匹配问题,在两个线程之间建立一个缓存空间。 用一个队列来充当缓存空间,如果生产者生产过快就让生产者线程等一等,若消费者...
  • 管程,生产者消费者

    千次阅读 2018-05-01 21:10:09
    假设将生产者代码中的两个down操作交换一下次序,将使得mutex的值在empty之前而不是在其之后被减1。如果缓冲区完全满了,生产者将阻塞,mutex值为0。这样一来,当消费者下次试图访问缓冲区时,它将对mutex执行一个...
  • 【JAVA多线程】如何解决一个生产者与消费者问题

    万次阅读 多人点赞 2018-11-01 15:28:19
    如何解决一个生产者与消费者问题 生产者与消费者问题是多线程同步的一个经典问题。生产者和消费者同时使用一块缓冲区,生产者生产商品放入缓冲区,消费者从缓冲区中取出商品。我们需要保证的是,当缓冲区满时,生产...
  • Java多线程技术~生产者和消费者问题

    万次阅读 热门讨论 2020-06-16 14:15:18
    Java多线程技术~生产者和消费者问题 本文是上一篇文章的后续,详情点击该连接 线程通信 应用场景:生产者和消费者问题        假设仓库中只能存放一件产品,生产者将生产出来的...
  • 生产者消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一存储空间,生产者向空间里生产数据,而消费者取走数据。 阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞...
  • 今天主要是来说一下如何在Springboot中使用redis实现一个 生产者/消费者模式的队列, 首先解释下几个问题。 1、什么是生产者/消费者模式? 消息队列一般是有两种场景 1、种是发布者订阅者模式 2、种是生产者消费者...
  • 一个生产者对应多个消费者   同一注册中心 生产者,部署在不同的容器里(如tomcat) 保证注册中心IP一致 保证Dubbo协议端口不一致 示例 生产者1 &lt;?xml version="1.0" encoding="...
  • 操作系统------生产者消费者模型

    千次阅读 2018-04-24 18:06:59
    在“进程间通信----信号量”一文中,有简单介绍过生产者消费者模型的基本概念。在下文中将使用有关线程的互斥与同步的相关概念来实现两种不同类型的生产者消费者模型。在本文中侧重于线程间同步的实现。有关线程互斥...
  • Kafka生产者和消费者的工作原理

    万次阅读 2020-08-06 19:52:15
    探究Kafka生产者的工作原理 主题和日志 对于每个主题,Kafka群集都会维护一个分区日志,如下所示: 每个分区(Partition)都是有序的(所以每一个Partition内部都是有序的),不变的记录序列,这些记录连续地附加到...
  • 生产者/消费者问题的实现: 掌握进程、线程的概念,熟悉相关的控制语; 掌握进程、线程间的同步原理和方法; 掌握进程、线程间的互斥原理和方法; 掌握使用信号量原语解决进程、线程间互斥和同步方法。 实验原理 ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 722,145
精华内容 288,858
关键字:

生产者