精华内容
下载资源
问答
  • 三种方式实现生产者-消费者模型

    千次阅读 2019-11-17 09:40:08
    生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的线程——即所谓的“生产者”和...

    前言

    生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

    看完了定义,相信懵逼的依然懵逼,那我就来说人话吧。
    生产者消费者模型需要抓住“三个主体,三个要点“,三个主体是指:生产者消费者缓冲区。生产者往缓冲区放数据,消费者从缓冲区取数据。
    生产者-消费者模型
    整个模型大致就是上面图示的结构。
    三个要点是指:

    • 缓冲区有固定大小
    • 缓冲区满时,生产者不能再往缓冲区放数据(产品),而是被阻塞,直到缓冲区不是满的
    • 缓冲区为空时,消费者不能再从缓冲区取数据,而是被阻塞,直到缓冲区不是空的。

    因为数据(产品)往往是先生产出来的先被消费。所以缓冲区一般用有界队列实现,又由于生产者、消费者在特定情况下需要被阻塞,所以更具体一点,缓冲区一般用有界阻塞队列来实现。
    本篇用三种方式实现生产者-消费者模型:wait/notify + 队列、Lock/Condition + 队列、有界阻塞队列。

    wait/notify + 队列

    实现生产者-消费者模型,主要是实现两个核心方法:往缓冲区中放元素、从缓冲区中取元素。
    以下是缓冲区的代码实现,是生产者-消费者模型的核心。

    import java.util.LinkedList;
    import java.util.Queue;
    
    /**
     * wait/notify机制实现生产者-消费者模型
     */
    public class ProducerConsumerQueue<E> {
        /**
         * 队列最大容量
         */
        private final static int QUEUE_MAX_SIZE = 3;
        /**
         * 存放元素的队列
         */
        private Queue<E> queue;
    
        public ProducerConsumerQueue() {
            queue = new LinkedList<>();
        }
    
        /**
         * 向队列中添加元素
         *
         * @param e
         * @return
         */
        public synchronized boolean put(E e) {
            // 如果队列是已满,则阻塞当前线程
            while (queue.size() == QUEUE_MAX_SIZE) {
                try {
                    wait();
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
    
            // 队列未满,放入元素,并且通知消费线程
            queue.offer(e);
            System.out.println(Thread.currentThread().getName() + " -> 生产元素,元素个数为:" + queue.size());
            notify();
            return true;
        }
    
        /**
         * 从队列中获取元素
         * @return
         */
        public synchronized E get() {
            // 如果队列是空的,则阻塞当前线程
            while (queue.isEmpty()) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            // 队列非空,取出元素,并通知生产者线程
            E e = queue.poll();
            System.out.println(Thread.currentThread().getName() + " -> 消费元素,元素个数为:" + queue.size());
            notify();
            return e;
        }
    }
    

    实现了缓冲区后,对于生产者、消费者线程的实现就比较简单了

    /**
     * 生产者线程
     */
    public class Producer implements Runnable {
    
        private ProducerConsumerQueue<Integer> queue;
    
        public Producer(ProducerConsumerQueue<Integer> queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                queue.put(i);
            }
        }
    }
    
    /**
     * 消费者线程
     */
    public class Consumer implements Runnable {
    
        private ProducerConsumerQueue<Integer> queue;
    
        public Consumer(ProducerConsumerQueue<Integer> queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                queue.get();
            }
        }
    }
    

    测试代码如下:

    import java.util.Random;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class ProducerConsumerDemo {
    
        private final static ExecutorService service = Executors.newCachedThreadPool();
    
        public static void main(String[] args) throws InterruptedException {
    
            Random random = new Random();
    
            // 生产者-消费者模型缓冲区
            ProducerConsumerQueue<Integer> queue = new ProducerConsumerQueue<>();
    
            Producer producer = new Producer(queue);
            Consumer consumer = new Consumer(queue);
    
            for (int i = 0; i < 3; i++) {
                // 休眠0-50毫秒,增加随机性
                Thread.sleep(random.nextInt(50));
                service.submit(producer);
            }
            for (int i = 0; i < 3; i++) {
                // 休眠0-50毫秒,增加随机性
                Thread.sleep(random.nextInt(50));
                service.submit(consumer);
            }
    
            // 关闭线程池
            service.shutdown();
        }
    }
    

    执行结果(由于执行结果比较长,所以截取部分结果)

    pool-1-thread-1 -> 生产元素,元素个数为:1
    pool-1-thread-1 -> 生产元素,元素个数为:2
    pool-1-thread-1 -> 生产元素,元素个数为:3
    pool-1-thread-4 -> 消费元素,元素个数为:2
    pool-1-thread-1 -> 生产元素,元素个数为:3
    pool-1-thread-4 -> 消费元素,元素个数为:2
    pool-1-thread-3 -> 生产元素,元素个数为:3
    pool-1-thread-4 -> 消费元素,元素个数为:2
    pool-1-thread-4 -> 消费元素,元素个数为:1
    pool-1-thread-4 -> 消费元素,元素个数为:0
    pool-1-thread-2 -> 生产元素,元素个数为:1
    pool-1-thread-2 -> 生产元素,元素个数为:2
    pool-1-thread-2 -> 生产元素,元素个数为:3
    pool-1-thread-4 -> 消费元素,元素个数为:2
    ......
    

    虽然是部分结果,但是依然可以看出几点:

    • 由于队列的最大长度是3(QUEUE_MAX_SIZE),所以缓冲区元素不会超过3,说明缓冲区满时,生产者确实被阻塞了
    • 缓冲区元素个数最小为0,不会出现负数,说明缓冲区为空时,消费者被阻塞了

    这就是生产者-消费者模型基于wait/notify+队列的基本实现。

    Lock/Condition + 队列

    同样,核心部分缓冲区的实现代码实现如下:

    import java.util.LinkedList;
    import java.util.Queue;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
     * Lock/Condition实现生产者-消费者模型
     */
    public class ProducerConsumerQueue<E> {
        /**
         * 队列最大容量
         */
        private final static int QUEUE_MAX_SIZE = 3;
        /**
         * 存放元素的队列
         */
        private Queue<E> queue;
    
        private final Lock lock = new ReentrantLock();
        private final Condition producerCondition = lock.newCondition();
        private final Condition consumerCondition = lock.newCondition();
    
        public ProducerConsumerQueue() {
            queue = new LinkedList<>();
        }
    
        /**
         * 向队列中添加元素
         * @param e
         * @return
         */
        public boolean put(E e) {
            final Lock lock = this.lock;
            lock.lock();
            try {
                while (queue.size() == QUEUE_MAX_SIZE) {
                    // 队列已满
                    try {
                        producerCondition.await();
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                }
                queue.offer(e);
                System.out.println(Thread.currentThread().getName() + " -> 生产元素,元素个数为:" + queue.size());
                consumerCondition.signal();
            } finally {
                lock.unlock();
            }
            return true;
        }
    
        /**
         * 从队列中取出元素
         * @return
         */
        public E get() {
            final Lock lock = this.lock;
            lock.lock();
            try {
                while (queue.isEmpty()) {
                    // 队列为空
                    try {
                        consumerCondition.await();
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                }
                E e = queue.poll();
                System.out.println(Thread.currentThread().getName() + " -> 消费元素,元素个数为:" + queue.size());
                producerCondition.signal();
                return e;
            } finally {
                lock.unlock();
            }
        }
    }
    

    可以看到,代码基本和wait/notify实现方式一致,基本只是API的不同而已。生产者线程、消费者线程、测试代码更是和wait/notify方式一致,所以就不赘述了。

    有界阻塞队列

    同样,缓冲区的实现也是其核心部分,不过阻塞队列已经提供了相应的阻塞API,所以不需要额外编写阻塞部分的代码

    /**
     * 阻塞队列实现生产者-消费者模型
     * 对应的阻塞方法是put()/take()
     */
    public class ProducerConsumerQueue<E> {
    
        /**
         * 队列最大容量
         */
        private final static int QUEUE_MAX_SIZE = 3;
        /**
         * 存放元素的队列
         */
        private BlockingQueue<E> queue;
    
        public ProducerConsumerQueue() {
            queue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE);
        }
    
        /**
         * 向队列中添加元素
         * @param e
         * @return
         */
        public boolean put(E e) {
            try {
                queue.put(e);
                System.out.println(Thread.currentThread().getName() + " -> 生产元素,元素个数为:" + queue.size());
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
            return true;
        }
    
        /**
         * 从队列中取出元素
         * @return
         */
        public E get() {
            try {
                E e = queue.take();
                System.out.println(Thread.currentThread().getName() + " -> 消费元素,元素个数为:" + queue.size());
                return e;
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
            return null;
        }
    }
    

    生产者线程、消费者线程、测试代码也和前面两种一模一样。

    总结

    通过三种方式实现生产者-消费者模型,可以看出使用阻塞队列的方式最简单,也更安全。其实看看阻塞队列的源码,会发现其内部的实现和这里的前两种差不多,只是JDK提供的阻塞队列健壮性更好。

    说完了三种实现方式,再来说说为什么要使用生产者-消费者模式,消费者直接调用生产者不好吗?
    回顾文章开始的那张图,试想一下,如果没有生产者-消费者模式会怎样,大概会变成如下这样
    没有生产者-消费者模式
    可以看到,三个生产者,三个消费者就会产生 3 * 3 = 9条调用关系(箭头方法代表数据走向),还有一点就是消费者也有可能还是生产者,生产者也有可能还是消费者,一旦生产者、消费者的数量多了之后就会形成复杂的调用网。所以生产者-消费者模型的最大好处就是解耦
    其次如果生产者和消费者的速度上有较大的差异,就一定会存在一方总是在等待另一方的情况。比如快递小哥如果每一个快递都必须直接送到用户手上,如果某个用户一直联系不上,或者说过了很久才取快递,那么快递小哥就只能一直等待。所以就出现了快递站,快递小哥只需要把快递放在指定位置,用户去指定位置取就行了。所以生产者-消费者模型的第二个好处就是平衡生产能力和消费能力的差异

    以上就是本篇关于生产者-消费者模型的全部内容。

    展开全文
  • 经典的进程同步问题-----生产者-消费者问题详解

    千次阅读 多人点赞 2019-11-25 08:32:47
    经典的进程同步问题-----生产者-消费者问题详解 ​ 本文和接下来几篇博文是对上篇文章(进程同步机制)的一次实践,通过具体的例子来加深理论的理解,会用三个经典的进程同步问题来进行讲解,并且会配有伪代码和...

    经典的进程同步问题-----生产者-消费者问题详解

    ​ 本文和接下来几篇博文是对上篇文章(进程同步机制)的一次实践,通过具体的例子来加深理论的理解,会用三个经典的进程同步问题来进行讲解,并且会配有伪代码和Java实践(使用多线程模拟),深入的进行讲解。

    ​ 进程同步问题是一个非常重要且相当有趣的问题,因而吸引了很多学者对他进行研究,比如在前几篇博客中提到的老熟人迪杰斯特拉,由此也产生了一系列经典的进程同步问题,本文就选取其中较为代表性的生产者-消费者问题来进行学习,以帮助我们更好的理解进程同步的概念及实现方法。

    ​ 生产者-消费者问题是相互合作的进程关系的一种抽象,例如,在输入时,输入进程是生产者,计算进程是消费者;而在输出时,则计算进程是生产者,而打印进程是消费者,因此该问题有很大的代表性及实用价值。并且现在很多技术都使用到了,最典型的就是消息队列。

    ​ 本文主要是对经典进程同步问题的求解,因此在这里先给出一个解题思路(仅代表个人意见、如有疑问,可评论区讨论):

    1. 分析清楚题目涉及的进程间制约关系;
    2. 设置信号量(包括信号量的个数和初值,写出信号量物理含义);
    3. 给出进程相应程序的算法描述或流程控制,并把P、V操作加到程序适当之处。

    1.问题描述

    ​ 有一群生产者进程在生产产品,并将这些产品提供给消费者进程进行消费。为了使生产这进程与消费者进程能并发的执行,在两者之间设置了一个具有n个缓冲区的缓冲池,生产者进程将其生产的产品放到一个缓冲区(缓冲池中的一个存储单位)中;消费者进程可从一个缓冲区中取走产品去消费。

    ​ 需要注意的是,尽管所有的生产者和消费者都是以异步的方式运行的,但是他们之间必须保持同步,既不允许消费者进程在缓冲区为空时去取产品,也不允许生产者进程在缓冲区已满且产品尚未被取走时向缓冲区投放产品。

    2.问题分析

    1. 缓冲池一次只能有一个进程访问;
    2. 只要缓冲池未满,生产者就可以把产品送入缓冲区;
    3. 只要缓冲池未空,消费者就可以从缓冲区中取走产品。

    ​ 下图是一个生产者与消费者进程执行的流程图,从图中我们可以很清晰的看到上述的三个进程间的关系,其中生产者和消费者中操作缓冲区都需要先进行申请,也就是我们说的进入区,操作完成后要执行释放,也就是退出区,通过这样来实现对缓冲池的互斥访问。通过图中的贯通两个进程的虚线来实现生产者和消费者的同步关系。

    3.信号量设置

    ​ 由于有界缓冲池是一个临界资源,必须互斥使用,这时可以利用互斥信号量mutex实现诸进程对缓冲池的互斥使用。因为是互斥信号量,所以mutex初值为1。

    ​ 另外,可以设置两个同步信号量:一个表示缓冲池中空缓冲区的数目,用empty表示,初值为缓冲池的大小n;另一个表示已满缓冲区的数目,即缓冲池中消息的数量,用full表示,初值为0。

    ​ 除了信号量外,我们可以使用循环链表来表示有界缓冲池,假设缓冲池的大小为n,我们用buffer[n]来表示,另外还有两个队首指针in和队尾指针out,其初值都为0。

    4.记录型信号量解决生产者-消费者问题

    ​ 首先我们先使用记录型信号量来解决生产者-消费者问题,根据上面的分析,我们先给出伪代码:

    int in=0,out=0;
    //item为消息的结构体
    item buffer[n];                       
    semaphore mutex=1,empty=n,full=0;     //初始化信号量
    
    void producer(){
      do {
      	//生产者产生一条消息
        producer an item nextp;
        ...
        //判断缓冲池中是否仍有空闲的缓冲区
        P(empty);
        //判断是否可以进入临界区(操作缓冲池)
        P(mutex);
        //向缓冲池中投放消息
        buffer[in] = nextp;
        //移动入队指针
        in = (in + 1) % n;
        //退出临界区,允许别的进程操作缓冲池
        V(mutex);
        //缓冲池中非空的缓冲区数量加1,可以唤醒等待的消费者进程
        V(full);
      }while(true);
    }
    
    void consumer(){
      do {
      	//判断缓冲池中是否有非空的缓冲区(消息)
        P(full); 
        //判断是否可以进入临界区(操作缓冲池)
        P(mutex);
        //从缓冲池中取出消息
        nextc = buffer[out];
        //移动出队指针
        out = (out + 1) % n;
        //退出临界区,允许别的进程操作缓冲池
        V(mutex);
        //缓冲池中空缓冲区数量加1,可以唤醒等待的生产者进程
        V(empty);
        //消费消息
        consumer the item in nextc;
        ...
      }while(true);
    }
    

    ​ 通过上面的伪代码,我们可以看到,在每个程序中用于实现互斥的P(mutex)和V(mutex)必须成对的出现,并且要出现在同一个程序中;对于用于控制进程同步的信号量full和empty,其P、V操作也必须要成对的出现,但他们分别处于不同的程序之中。还有比较重要的就是,每个程序中的多个P操作顺序不能颠倒,比如说生产者进程,应先执行对资源信号量的P操作–P(empty),再执行对互斥信号量的P操作–P(mutex),否则可能会因为持有了互斥锁,但是没有空闲的缓冲区而导致生产者进程阻塞,但是别的进程又无法进入临界区,导致进程发生死锁。

    ​ 下面给出对应的Java 多线程的实现,为了简单,临界缓冲池我使用了一个长度为50的list来模拟,代码如下:

    /**
     * 记录型信号量
     */
    static final Semaphore mutex = new Semaphore(1);
    
    static List<Integer> buffer = new ArrayList<>();
    
    static final Semaphore empty = new Semaphore(50);
    static final Semaphore full = new Semaphore(0);			//数据定义
    
    static Integer count = 0;
    
    static class Producer extends Thread {
      Producer(String name) {
        super.setName(name);
      }
    
      @Override
      public void run() {
        do {
          try {
          	//判断缓冲池中是否仍有空闲的缓冲区
            empty.acquire();
            //判断是否可以进入临界区(操作缓冲池)
            mutex.acquire();
            log.info("生产了一条消息:【{}】", count);
            //向缓冲池中投放消息
            buffer.add(count++);
            //Thread.sleep(1000);
            //释放资源
            full.release();
            mutex.release();
          } catch (InterruptedException e) {
            log.error("生产消息时产生异常!");
          }
        } while (true);
      }
    }
    
    static class Consumer extends Thread {
      Consumer(String name) {
        super.setName(name);
      }
    
      @Override
      public void run() {
        do {
          try {
    		//判断缓冲池中是否仍有空闲的缓冲区
            full.acquire();	
            //判断是否可以进入临界区(操作缓冲池)		
            mutex.acquire();			
            log.info("消费了一条消息:【{}】", buffer.remove(0));
            //Thread.sleep(1000);
            empty.release();
            mutex.release();
          } catch (InterruptedException e) {
            log.error("消费消息时产生异常!");
          }
        } while (true);
      }
    }
    
    public static void main(String[] args) {						//测试
      Producer p1 = new Producer("p1");
      Producer p2 = new Producer("p2");
    
      Consumer c1 = new Consumer("c1");
      Consumer c2 = new Consumer("c2");
      p1.start();
      p2.start();
      c1.start();
      c2.start();
    }
    

    5.使用AND型信号量解决生产者-消费者问题

    ​ 对于AND型信号量,我们就不做过多的说明了,我们直接给出生产者-消费者问题的伪代码解决,也没有什么变化,主要是信号量申请部分:

    ...                                   //定义信号量并初始化
    
    void producer(){
      do {
      	//生产者产生一条消息
        producer an item nextp;
        ...
        //判断缓冲池中是否仍有空闲的缓冲区&&是否可以进入临界区
        Swait(empty, mutex);
        //向缓冲池中投放消息
        buffer[in] = nextp;
        //移动入队指针
        in = (in + 1) % n;
        //退出临界区&&消息数量加1,可以唤醒等待的消费者进程
        Ssignal(mutex, full);
      }while(true);
    }
    
    void consumer(){
      do {
      	//判断缓冲池中是否有消息&&是否可以进入临界区
        Swait(full, mutex);
        //从缓冲池中取出消息
        nextc = buffer[out];
        //移动出队指针
        out = (out + 1) % n;
        //退出临界区&&缓冲池中空缓冲区数量加1,可以唤醒等待的生产者进程
        Ssignal(mutex, empty);
        //消费消息
        consumer the item in nextc;	
        ...
      }while(true);
    }
    

    ​ 对应的Java代码实现可以参看我的另一篇博文

    6.使用信号量集解决生产者-消费者问题

    ​ 信号量集是对AND型型号量的一次扩展,其代码不同的就在于wait操作可以一次申请多个资源和可以设置资源分配下限,下面是伪代码:

    ...                                   //定义信号量并初始化
    
    void producer(){
      do {
        producer an item nextp;
        ...
        //判断缓冲池中是否仍有空闲的缓冲区&&是否可以进入临界区
        Swait(empty, 1, 1, mutex, 1, 1);
        //向缓冲池中投放消息
        buffer[in] = nextp;	
        //移动入队指针
        in = (in + 1) % n;
        //退出临界区&&消息数量加1,可以唤醒等待的消费者进程
        Ssignal(mutex, 1, full, 1);	
      }while(true);
    }
    
    void consumer(){
      do {
      	//判断缓冲池中是否有消息&&是否可以进入临界区
        Swait(full, 1 ,1 , mutex, 1, 1); 
        //从缓冲池中取出消息
        nextc = buffer[out];
        //移动出队指针
        out = (out + 1) % n;
        //退出临界区&&消息数量减1,可以唤醒等待的生产者进程
        Ssignal(mutex, 1, empty, 1);
        //消费消息
        consumer the item in nextc;
        ...
      }while(true);
    }
    

    ​ 对应的Java代码实现可以参看我的另一篇博文

    7.使用管程解决生产者-消费者问题

    ​ 对管程,我们在前面的文章中详细的进行了介绍,在这里也使用管程来解决实际的问题,伪代码如下(我们对于管程内部的过程进行简单的描述):

    Monitor ProducerConsumer {
      intem buffer[N];
      int in=0,out=0;
      condition notempty,notfull;//条件变量 非空和非满
      int count=0;//缓冲区中消息的数量
      
      //放消息的过程
      void put(item x){
      	//如果缓冲池满了,则将线程阻塞到notfull中
        if(count >= N){
          cwait(notfull);
        }
        //向缓冲池中投放消息
        buffer[in] = nextp;
        //移动入队指针
        in = (in + 1) % N;
        //缓冲区中消息加一
       	count++;
       	//唤醒因为没消息消费阻塞的消费者线程
        signal(notempty);
    	}
      
      //取消息的过程
      void get(item x){
      	//如果缓冲池为空,则将线程阻塞到notempty中
    	if(count <= 0){
          cwait(notempty);
        }
        x = buffer[out];//从缓冲池中取出消息
        out = (out + 1) % n;//移动出队指针
       	count--;//缓冲区中消息减一
        signal(notfull);//唤醒因为没空间存放消息阻塞的生产者线程
      }
    }PC;
    
    void producer(){
      do {
      	//生产者产生一条消息
        producer an item nextp;
        ...
        PC.put(nextp);//将消息放入到缓冲池中
      }while(true);
    }
    
    void consumer(){
      item x;
      do {
        PC.get(x);//从缓冲池中取出消息
        consumer the item in nextc;//消费消息
        ...
      }while(true);
    }
    

    ​ 对应的Java代码实现可以参看我的另一篇博文


    ​ 又到了分隔线以下,本文到此就结束了,本文内容全部都是由博主自己进行整理并结合自身的理解进行总结,如果有什么错误,还请批评指正。

    ​ 本文的所有java代码都已通过测试,对其中有什么疑惑的,可以评论区留言,欢迎你的留言与讨论;另外原创不易,如果本文对你有所帮助,还请留下个赞,以表支持。

    ​ 如有兴趣,还可以查看我的其他几篇博客,都是OS的干货(目录),喜欢的话还请点赞、评论加关注_

    参考文章列表:

    1.进程同步机制-----为进程并发执行保驾护航

    2.Java并发编程(JUC)模拟AND型信号量

    3.Java并发编程(JUC)模拟信号量集

    4.Java并发编程模拟管程(霍尔Hoare管程、汉森Hansan管程、MESA管程)

    5.操作系统武功修炼心法

    6.经典进程同步问题----哲学家进餐问题详解

    7.经典进程同步问题----读者-写者问题详解

    展开全文
  • 常见的生产者消费者实现一般是Unix或Linux下的使用Pthread完成的,这里使用WINDOWS环境下的API实现。 #include<iostream> #include<queue> #include<Windows.h> #include<...

    生产者-消费者问题是很经典的线程同步问题,这段代码给出的是多生产者、多消费者、有界缓冲区的一个C++实现。
    常见的生产者消费者实现一般是Unix或Linux下的使用Pthread完成的,这里使用WINDOWS环境下的API实现。

    #include<iostream>
    #include<queue>
    #include<Windows.h>
    #include<unordered_map>
    
    using namespace std;
    static const int QUEUE_SIZE = 10;
    static HANDLE semEmpty = NULL;
    static HANDLE semFull = NULL;
    
    static const int PRODUCER_SIZE = 3;
    static const int CONSUMER_SIZE = 10;
    static HANDLE event = NULL;
    static CRITICAL_SECTION queueMutex;//临界区锁,用于维护临界区,保护bufferQueue
    static queue<int> bufferQueue;
    static int goodsNum = 0;//当前生产商品的序号
    unordered_map<DWORD,int> id;//hashmap用来记录线程id与进程序号的映射
    
    DWORD WINAPI ProducerThread()
    {
    	while (true)
    	{
    		if (WAIT_TIMEOUT != WaitForSingleObject(event, 0))
    			break; 
    
    		WaitForSingleObject(semEmpty, INFINITE);
    		EnterCriticalSection(&queueMutex);
    		bufferQueue.push(goodsNum++);
    		cout << "生产者"<< id[GetCurrentThreadId()] <<"正在生产" << bufferQueue.back()<<"号商品"<< endl;
    		LeaveCriticalSection(&queueMutex);
    		ReleaseSemaphore(semFull, 1, NULL);
    	}
    	return 0;
    }
    
    DWORD WINAPI ConsumerThread()
    {
    	while (true)
    	{
    		if (WAIT_TIMEOUT != WaitForSingleObject(event, 0))
    			break; 
    
    		WaitForSingleObject(semFull, INFINITE);
    		EnterCriticalSection(&queueMutex);
    		cout << "\t\t\t消费者" << id[GetCurrentThreadId()] << "正在消费" << bufferQueue.front() << "号商品" << endl;
    		bufferQueue.pop();
    		LeaveCriticalSection(&queueMutex);
    		ReleaseSemaphore(semEmpty, 1, NULL);
    	}
    	return 0;
    }
    
    int main(void)
    {
    	HANDLE ThreadArray[CONSUMER_SIZE+PRODUCER_SIZE];
    	InitializeCriticalSection(&queueMutex); // queue读写同步
    	event = CreateEvent(NULL,true,false,NULL); // 手动控制、初始为非激发状态
    	
    	semEmpty = CreateSemaphore(NULL, QUEUE_SIZE, QUEUE_SIZE, NULL); 
    	semFull = CreateSemaphore(NULL, 0, QUEUE_SIZE, NULL);
    	
    	for (int i = 0; i < PRODUCER_SIZE; i++)
    	{
    		DWORD tid;
    		ThreadArray[i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)ProducerThread, NULL, 0, &tid);
    		pair<DWORD, int> pa;
    		pa.first = tid;
    		pa.second = i+1;
    		id.insert(pa);
    	}
    	for (int i = 0; i < CONSUMER_SIZE; i++)
    	{
    		DWORD tid;
    		ThreadArray[PRODUCER_SIZE+i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)ConsumerThread, NULL, 0, &tid);
    		pair<DWORD, int> pa;
    		pa.first = tid;
    		pa.second = i+1;
    		id.insert(pa);
    	}
    	Sleep(10000);
    	SetEvent(event);//10秒后退出生产者、消费者线程
    	WaitForMultipleObjects(PRODUCER_SIZE+CONSUMER_SIZE, ThreadArray, TRUE, INFINITE); 
    	for (int i = 0; i < PRODUCER_SIZE+CONSUMER_SIZE; i++)
    		CloseHandle(ThreadArray[i]);
    
    	CloseHandle(semEmpty);
    	CloseHandle(semFull);
    	CloseHandle(event);
    
    	DeleteCriticalSection(&queueMutex);
    	system("pause");
    	return 0;
    }
    
    展开全文
  • 经典生产者-消费者问题解析

    千次阅读 2020-04-09 14:35:33
    1.生产者-消费者问题 生产者消费者问题在现实系统中是很普遍的。例如在一个多媒体系统中,生产者编码视频帧,而消费者消费(解码)视频帧,缓冲区的目的就是减少视频流的抖动。又如在图形用户接口设计中,生产者...

    1.生产者-消费者问题

    生产者和消费者问题在现实系统中是很普遍的。例如在一个多媒体系统中,生产者编码视频帧,而消费者消费(解码)视频帧,缓冲区的目的就是减少视频流的抖动。又如在图形用户接口设计中,生产者检测到鼠标和键盘事件,并将其插入到缓冲区中。消费者以某种基于优先级的方式从缓冲区中取出这些事件并显示在屏幕上。

    生产者和消费者模式共享一个有n个槽位的有限缓冲区。生产者反复地生成新的item,并将它插入到缓冲区中。消费者不断从缓冲区中取出这些item,并消费它们。它有3个显著特点:

    • 因为生产和消费都涉及共享变量的更新,所以必须保证对缓冲区的访问是互斥的。
    • 如果缓冲区是满的,那么生产者必须等待直到有一个槽位可用。
    • 如果缓冲区是空的,那么消费者必须等待直到有一个item可以。

    下文将开发一个简单的生产者消费者包SBUF,它操作类型为sbuf_t的有限缓冲区。item存放在一个动态分配的容量为n的整数数组buf中。索引值front和rear分别指向数组的首尾项。三个信号量同步对缓冲区的访问。mutex信号量提供互斥访问,slotsitems信号量分别记录空槽位和可用item的数量。

    typedef struct{
        int *buf;		//缓冲区指针(指向一个数组)
        int n;			//最大空槽位数量(缓冲区大小)
        int front;		//指向数组第一个item,即buf[(front+1)%n]
        int rear;		//指向数组最后一个item,即buf[rear%n]
        sem_t mutex;	//缓冲区互斥锁
        sem_t slots;	//可用槽位数
        sem_t items;	//可用item数
    }sbuf_t;

    Posix标准定义的信号量操作函数:

    #include <semaphore.h>
    int sem_init(sem_t *sem, 0, unsigned int value);
    int sem_wait(sem_t *s);		//等价于P(s)
    int sem_post(sem_t *s);		//等价于V(s)

    下文继续给出SBUF包实现的代码:

    • sbuf_init函数初始化一个缓冲区,在其它任意函数前被调用;
    • sbuf_deinit函数释放一个缓冲区,在其它任意函数后被调用;
    • sbuf_insert函数等待一个可用槽位并添加item;
    • sbuf_remove函数等待并消费一个item;
    /* $begin sbufc */
    #include "csapp.h"
    #include "sbuf.h"
    
    /* Create an empty, bounded, shared FIFO buffer with n slots */
    /* $begin sbuf_init */
    void sbuf_init(sbuf_t *sp, int n)
    {
        sp->buf = Calloc(n, sizeof(int)); 
        sp->n = n;                       /* Buffer holds max of n items */
        sp->front = sp->rear = 0;        /* Empty buffer iff front == rear */
        Sem_init(&sp->mutex, 0, 1);      /* Binary semaphore for locking */
        Sem_init(&sp->slots, 0, n);      /* Initially, buf has n empty slots */
        Sem_init(&sp->items, 0, 0);      /* Initially, buf has zero data items */
    }
    /* $end sbuf_init */
    
    /* Clean up buffer sp */
    /* $begin sbuf_deinit */
    void sbuf_deinit(sbuf_t *sp)
    {
        Free(sp->buf);
    }
    /* $end sbuf_deinit */
    
    /* Insert item onto the rear of shared buffer sp */
    /* $begin sbuf_insert */
    void sbuf_insert(sbuf_t *sp, int item)
    {
        P(&sp->slots);                          /* Wait for available slot */
        P(&sp->mutex);                          /* Lock the buffer */
        sp->buf[(++sp->rear)%(sp->n)] = item;   /* Insert the item */
        V(&sp->mutex);                          /* Unlock the buffer */
        V(&sp->items);                          /* Announce available item */
    }
    /* $end sbuf_insert */
    
    /* Remove and return the first item from buffer sp */
    /* $begin sbuf_remove */
    int sbuf_remove(sbuf_t *sp)
    {
        int item;
        P(&sp->items);                          /* Wait for available item */
        P(&sp->mutex);                          /* Lock the buffer */
        item = sp->buf[(++sp->front)%(sp->n)];  /* Remove the item */
        V(&sp->mutex);                          /* Unlock the buffer */
        V(&sp->slots);                          /* Announce available slot */
        return item;
    }
    /* $end sbuf_remove */
    /* $end sbufc */

    2. 读者-写者问题

    读者-写者问题在现实系统中也比较常见。例如,一个在线影院座位预定系统中,允许有无限多个客户同时查看(读者)座位分配,但是正在预定的客户必须拥有对数据库的独占访问(写者)。读者写者问题又分为以下几种情况:

    • 读者优先,即除非有写者正在写,否则不能让读者等;
    • 写者优先,即只要写者准备好写,就尽快完成写。在写者发出写请求后到达的读者,必须等待;

    下文给出读者优先的一个示例:

    /*全局变量*/
    int readcnt;	//统计当前在临界区中读者的数量
    sem_t mutex;	//保护对readcnt的访问
    sem_t w;		//控制对访问共享对象的临界区的访问
    
    void reader(void)
    {
        while(1){
            P(&mutex);
            readcnt++;
            if(readcnt == 1)	//first in
                P(&w);
            V(&mutex);
            
            /* 临界区操作语句 */
            /* 读语句 */
            
            P(&mutex);
            readcnt--;
            if(readcnt == 0)	//last out
                V(&w);
            V(&mutex);
        }
    }
    
    void writer(void)
    {
        while(1){
            P(&w);
            
            /* 临界区操作语句 */
            /* 写语句 */
            
            V(&w);
        }
    }
    • 为了保证任意时刻临界区中只有一个写者,每当一个写者进入临界区时,它对互斥锁w加锁,每当它离开临界区时,对w解锁;

    • 为了保证只要还有一个读者占用互斥锁w,那么无限多的读者就可以无障碍的进入临界区读,只有第一个进入临界区的读者对w加锁,只有最后一个离开的读者对w解锁。

      此法可能导致饥饿(starvation):如果有读者不断到达,写者就无限期等待。

      3. 基于预线程化的并发服务器

      前文叙述了如何使用信号量来访问共享变量和调度对共享资源的访问,现在可以动手实现一个基于预线程化的技术(prethreading)的并发服务器开发。

    如图所示,服务器是由一个主线程和一组工作者线程构成的。主线程不断接受来自客户端的连接请求,并将得到的连接描述符放在一个缓冲区中。每一个工作者线程反复地从共享缓冲区中取出描述符为客户端服务,然后等待下一个描述符。

    下面给出具体代码:

    #include "csapp.h"
    #include "sbuf.h"
    #define NTHREADS  4
    #define SBUFSIZE  16
    
    void echo_cnt(int connfd);
    void *thread(void *vargp);
    
    sbuf_t sbuf; /* Shared buffer of connected descriptors */
    
    int main(int argc, char **argv) 
    {
        int i, listenfd, connfd;
        socklen_t clientlen;
        struct sockaddr_storage clientaddr;
        pthread_t tid; 
    
        if (argc != 2) {
    	fprintf(stderr, "usage: %s <port>\n", argv[0]);
    	exit(0);
        }
        listenfd = Open_listenfd(argv[1]);
    
        sbuf_init(&sbuf, SBUFSIZE); //line:conc:pre:initsbuf
        for (i = 0; i < NTHREADS; i++)  /* Create worker threads */ 
    	Pthread_create(&tid, NULL, thread, NULL);               
        while (1) { 
            clientlen = sizeof(struct sockaddr_storage);
            connfd = Accept(listenfd, (SA *) &clientaddr, &clientlen);
            sbuf_insert(&sbuf, connfd); /* Insert connfd in buffer */
        }
    }
    
    void *thread(void *vargp) 
    {  
        Pthread_detach(pthread_self()); 
        while (1) { 
    	int connfd = sbuf_remove(&sbuf); /* Remove connfd from buffer */ 
    	echo_cnt(connfd);                /* Service client */
    	Close(connfd);
        }
    }
    • 首先初始化缓冲区sbuf(line 24)后,主线程创建了一组工作者线程(line 25~26)。
    • 之后进入无限循环,接受连接请求,并将得到的已连接描述符插入缓冲区sbuf中。
    • 每个工作者线程的行为非常简单,它等待直到能从缓冲区中取出一个已连接描述符(line 39),然后调用echo_cnt函数回送客户端的输入。

    下面给出echo_cnt函数的代码,它向你展示了一个从线程例程调用的初始化程序包的一般技术。其中全局变量byte_cnt中记录了从所有客户端接受到的累计字节数。

    #define RIO_BUFSIZE 8192
    typedef struct {
        int rio_fd;                /* Descriptor for this internal buf */
        int rio_cnt;               /* Unread bytes in internal buf */
        char *rio_bufptr;          /* Next unread byte in internal buf */
        char rio_buf[RIO_BUFSIZE]; /* Internal buffer */
    } rio_t;
    
    static int byte_cnt;  /* Byte counter */
    static sem_t mutex;   /* and the mutex that protects it */
    
    static void init_echo_cnt(void)
    {
        Sem_init(&mutex, 0, 1);
        byte_cnt = 0;
    }
    
    void echo_cnt(int connfd) 
    {
        int n; 
        char buf[MAXLINE]; 
        rio_t rio;
        static pthread_once_t once = PTHREAD_ONCE_INIT;
    
        Pthread_once(&once, init_echo_cnt); 
        Rio_readinitb(&rio, connfd);        
        while((n = Rio_readlineb(&rio, buf, MAXLINE)) != 0) {
            P(&mutex);
            byte_cnt += n; //line:conc:pre:cntaccess1
            printf("server received %d (%d total) bytes on fd %d\n", 
                   n, byte_cnt, connfd); 
            V(&mutex);
            Rio_writen(connfd, buf, n);
        }
    }
    
    void rio_readinitb(rio_t *rp, int fd) 
    {
        rp->rio_fd = fd;  
        rp->rio_cnt = 0;  
        rp->rio_bufptr = rp->rio_buf;
    }
    
    ssize_t rio_readlineb(rio_t *rp, void *usrbuf, size_t maxlen) 
    {
        int n, rc;
        char c, *bufp = usrbuf;
        for (n = 1; n < maxlen; n++) { 
            if ((rc = rio_read(rp, &c, 1)) == 1) {
                *bufp++ = c;
                if (c == '\n') {
                    n++;
                    break;
                }
            } else if (rc == 0) {
    	    	if (n == 1)
    				return 0; /* EOF, no data read */
    	   		else
    				break;    /* EOF, some data was read */
    		} else
    	    	return -1;	  /* Error */
        }
        *bufp = 0;
        return n-1;
    }
    
    ssize_t rio_writen(int fd, void *usrbuf, size_t n) 
    {
        size_t nleft = n;
        ssize_t nwritten;
     	char *bufp = usrbuf;
        while (nleft > 0) {
            if ((nwritten = write(fd, bufp, nleft)) <= 0) {
                if (errno == EINTR)  /* Interrupted by sig handler return */
                    nwritten = 0;    /* and call write() again */
                else
                    return -1;       /* errno set by write() */
            }
            nleft -= nwritten;
            bufp += nwritten;
        }
        return n;
    }
    • 首先初始化byte_cnt计数器和mutex信号量;
    • 一种是显式地调用一个初始化函数,一种是上文所采取的利用pthread_once函数。即当第一次有某个线程调用echo_cnt函数时,使用pthread_once函数去调用初始化函数。这个方法的优点是使程序包的使用更加容易,缺点使每一次调用echo_cnt函数都会导致调用pthread_once函数,而除了第一次,它没有做什么有用的事。
      首先初始化byte_cnt计数器和mutex信号量;
    • 一种是显式地调用一个初始化函数,一种是上文所采取的利用pthread_once函数。即当第一次有某个线程调用echo_cnt函数时,使用pthread_once函数去调用初始化函数。这个方法的优点是使程序包的使用更加容易,缺点使每一次调用echo_cnt函数都会导致调用pthread_once函数,而除了第一次,它没有做什么有用的事。
    • 一旦程序包被初始化,echo_cnt函数会初始化RIO带缓冲区的I/O包(line 20),然后回送从客户端接收到的每一个文本行。

    获取更多知识,请点击关注:
    嵌入式Linux&ARM
    CSDN博客
    简书博客
    知乎专栏


    展开全文
  • 生产者-消费者问题1 实验内容及要求2 实验环境3 实验设计3.1 问题描述3.2 基本思想3.2.1 生产者线程3.2.2 消费者线程3.2.3 同步的实现3.3 数据结构4 实验源码5 实验总结 1 实验内容及要求 1、模拟生产者消费者问题...
  • 生产者-消费者C++实现(一)

    千次阅读 2019-06-03 15:21:48
    和同学闲聊,谈到多线程中的经典问题——生产者-消费者问题:要求实现两个线程,一个线程负责对全局变量进行+1操作,一个线程负责打印更新后的值。自己从事code多年,自以为对多线程了解深入,不假思索,写出了下面...
  • 一、问题描述 信号量机制实现进程互斥的步骤: 设置初值为1的互斥信号量 在访问临界区之间进行P操作 在访问完临界区之后进行V操作 信号量机制实现进程同步的步骤: 设置初值为0的同步信号量 ...生产者-消费者问题中
  • 什么是阻塞队列? 阻塞队列是一个在队列基础上又支持了两个附加...阻塞队列常用于生产者消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。简而言之,阻塞队列是生产者用来存放元素...
  • 生产者-消费者问题

    千次阅读 2018-03-28 19:04:43
    问题基本描述生产者消费者问题是一个著名的进程同步的问题。它描述的是:有一群生产者进程在生产产品,并将这些产品提供给消费者进程去消费。为使生产者进程与消费者进程能并发执行,在两者之间设置了一个具有n个...
  • Java实现生产者-消费者问题解决方案实验
  • 操作系统,生产者-消费者问题详解

    千次阅读 2020-04-15 10:56:05
    实现进程的前驱关系生产者-消费者问题 信号量机制 1. 实现进程互斥 分析问题,确定临界区 设置互斥信号量,初值为1 临界区之前对信号量执行P操作 临界区之后对信号量执行V操作 2. 实现进程同步 分析问题,找出...
  • 在进程同步中,经典的同步问题有:生产者-消费者问题、读者-写者问题、哲学家进餐问题。 一、生产者消费者问题: 问题描述:使用一个缓冲区来保存物品,只有缓冲区没有满,生产者才可以放入物品;只有缓冲区不为...
  • 一、生产者-消费者问题 问题描述: 一组生产者进程和一组消费者进程共享一个初始为空、大小为n的缓冲区 只有缓冲区没满时,生产者才能把产品放入缓冲区,否则必须等待 只有缓冲区不为空,消费者才能从中取出产品...
  • 信号量解决生产者-消费者问题

    千次阅读 2018-03-24 13:52:15
    单缓冲区生产者-消费者问题 int B; semaphore emoty; /*可用的空缓冲区数*/ semaphore full; /*缓冲区内可用的产品数*/ empty = 1; /*缓冲区内允许放入一件产品*/ full = 0; /*缓冲区内没有产品*/ cobegin process...
  • 苹果-桔子问题是操作系统中P、V操作部分经典的问题,属于复杂一点的生产者-消费者问题,可以抽象的理解为两个生产者和两个消费者被连接到大小为1的缓冲区上。 #实现语言:C# #问题描述: 桌子上有一只盘子,每次...
  • 使用管程实现生产者-消费者问题

    千次阅读 2019-11-10 17:32:08
    使用信号量机制实现的生产者消费者问题需要客户端代码做很多控制,而管程把控制的代码独立出来,不仅不容易出错,也使得客户端代码调用更容易。 c 语言不支持管程,下面的示例代码使用了类 Pascal 语言来描述管程。...
  • 生产者-消费者模型的Java实现

    千次阅读 2016-07-08 20:57:32
    生产者-消费者(producer-consumer)问题,也称作有界缓冲区(bounded-buffer)问题,两个进程共享一个公共的固定大小的缓冲区。其中一个是生产者,用于将消息放入缓冲区;另外一个是消费者,用于从缓冲区中取出消息...
  • Windows下实现生产者-消费者模型

    千次阅读 2019-04-11 23:47:17
    在 windows 环境下,利用高级语言编程环境(限定为 VS 环境或 VC 环境)调用 CreateThread 函数和相关的同步函数,模拟实现“生产者-消费者”问题。“生产者-消费者”模拟实验的具体要求见后面附件。  定义全局...
  • 1.问题描述:生产者-消费者模型描述的是有一群生产者进程在生产产品,并将这些产品提供给消费者进程并发进行,具备并发程序的典型特征。PCM为使生产者进程和消费者进程并发进行,在它们之间设置一个具有多个缓冲区的...
  • 生产者-消费者问题(1)问题描述(2)问题分析(3)如何实现?(4)实现互斥的P操作一定要在实现同步的P操作之后()5![在这里插入图片描述]...
  • 生产者消费者问题 基本原理 在 Linux 操作系统用 C 实现经典同步问题:生产者—消费者,具体要求如下: ① 一个大小为 10 的缓冲区,初始状态为空。 ② 2个生产者,随机等待一段时间,往缓冲区中添加数据,若缓冲区...
  • 生产者-消费者问题的几种解决方法

    千次阅读 2018-07-07 20:05:15
    参考:...消费者则一次消费一段数据(将其从缓存中移除),问题的核心就是要保证不让生产者在缓存还是满的时候仍要向缓存写数据,不让消费者试图从空的缓存中取出数据。解...
  • 问题描述:生产者-消费者问题,也叫做缓存绑定问题(bounded-buffer),是一个多进程同步问题。 即有两个进程:制造少和消费者,共享一个固定大小的缓存 制造商的工作是制造一段数据,放进缓存,如此重复。 消费者...
  • 你好,我想用VC实现生产者-消费者问题。其中包括详细介绍以及程序,及程序最后结果。
  • 生产者-消费者问题及C语言实现

    千次阅读 2017-07-10 16:07:54
    生产者-消费者问题是最著名的进程同步问题。它描述了一组生产者与一组消费者,它们共享一个有界缓冲池,生产者向池中投入产品,消费者从池中取得产品。假定缓冲池中有 N 个缓冲区,每个缓冲区只能存放一个类型为 ...
  • 生产者-消费者问题实际上是相互合作进程关系的一种抽象。(提供数据的是生产者,使用数据的是消费者) 制约关系。不允许消费者进程到一个空缓冲区(不是没有数据,是数据无效(垃圾数据))中...
  • 生产者消费者对缓冲区互斥访问是互斥关系,同时生产者消费者又是一个相互协作的关系,只有生产者生产之后,消费者只能才能消费,它们还是同步关系。 (2)、整理思路。只有生产生产者消费者进程,正好是这两个...
  • 生产者-消费者模式的核心组件是共享内存缓冲区,它的作用是生产者消费者之间的通信桥梁,避免二者之间的直接通信,有效地降低了二者耦合性。生产者不需要知道消费者的存在,消费者也不需要知...
  • 生产者-消费者问题是一个经典的进程同步问题,该问题最早由Dijkstra提出,用以演示他提出的信号量机制。在同一个进程地址空间内执行的两个线程生产者线程生产物品,然后将物品放置在一个空缓冲区中供消费者线程消费...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 324,924
精华内容 129,969
关键字:

生产者-消费者