精华内容
下载资源
问答
  • 【操作系统】生产者消费者问题

    万次阅读 多人点赞 2018-08-11 00:43:20
    生产者消费者模型 生产者消费者模型 一、 生产者消费者问题 二、 问题分析 三、 伪代码实现 四、代码实现(C++) 五、 互斥锁与条件变量的使用比较 一、 生产者消费者问题 生产者消费者问题...

    生产者消费者模型

    一、 生产者消费者问题

    生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
    .
    要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。通常采用进程间通信的方法解决该问题。如果解决方法不够完善,则容易出现死锁的情况。出现死锁时,两个线程都会陷入休眠,等待对方唤醒自己。该问题也能被推广到多个生产者和消费者的情形。

    这里写图片描述


    二、 问题分析

    该问题需要注意的几点:

    • 在缓冲区为空时,消费者不能再进行消费
    • 在缓冲区为满时,生产者不能再进行生产
    • 在一个线程进行生产或消费时,其余线程不能再进行生产或消费等操作,即保持线程间的同步
    • 注意条件变量与互斥锁的顺序

    这里写图片描述
    由于前两点原因,因此需要保持线程间的同步,即一个线程消费(或生产)完,其他线程才能进行竞争CPU,获得消费(或生产)的机会。对于这一点,可以使用条件变量进行线程间的同步:生产者线程在product之前,需要wait直至获取自己所需的信号量之后,才会进行product的操作;同样,对于消费者线程,在consume之前需要wait直到没有线程在访问共享区(缓冲区),再进行consume的操作,之后再解锁并唤醒其他可用阻塞线程。

    这里写图片描述
    在访问共享区资源时,为避免多个线程同时访问资源造成混乱,需要对共享资源加锁,从而保证某一时刻只有一个线程在访问共享资源。


    三、 伪代码实现

    假设缓冲区大小为10,生产者、消费者线程若干。生产者和消费者相互等效,只要缓冲池未满,生产者便可将消息送入缓冲池;只要缓冲池未空,消费者便可从缓冲池中取走一个消息。

    • items代表缓冲区已经使用的资源数,spaces代表缓冲区可用资源数
    • mutex代表互斥锁
    • buf[10] 代表缓冲区,其内容类型为item
    • in、out代表第一个资源和最后一个资源
    var items = 0, space = 10, mutex = 1;
    var in = 0, out = 0;
    item buf[10] = { NULL };
    
    producer {
        while( true ) {
            wait( space );  // 等待缓冲区有空闲位置, 在使用PV操作时,条件变量需要在互斥锁之前
            wait( mutex );  // 保证在product时不会有其他线程访问缓冲区
    
            // product
            buf.push( item, in );  // 将新资源放到buf[in]位置 
            in = ( in + 1 ) % 10;
            
            signal( mutex );  // 唤醒的顺序可以不同
            signal( items );  // 通知consumer缓冲区有资源可以取走
        }
    }
    
    consumer {
        while( true ) {
            wait( items );  // 等待缓冲区有资源可以使用
            wait( mutex );  // 保证在consume时不会有其他线程访问缓冲区
    
            // consume
            buf.pop( out );  // 将buf[out]位置的的资源取走
            out = ( out + 1 ) % 10;
    
            signal( mutex );  // 唤醒的顺序可以不同
            signal( space );  // 通知缓冲区有空闲位置
        }
    }
    

    不能将线程里两个wait的顺序调换否则会出现死锁。例如(调换后),将consumer的两个wait调换,在producer发出signal信号后,如果producer线程此时再次获得运行机会,执行完了wait(space),此时,另一个consumer线程获得运行机会,执行了 wait(mutex) ,如果此时缓冲区为空,那么consumer将会阻塞在wait(items),而producer也会因为无法获得锁的所有权所以阻塞在wait(mutex),这样两个线程都在阻塞,也就造成了死锁。


    四、代码实现(C++)

    #include <iostream>
    #include <string.h>
    #include <pthread.h>
    #include <unistd.h>
    using namespace std;
    
    int current = 0;  // producer运行加1,consumer运行减1
    int buf[10];
    int in = 0, out = 0;
    int items = 0, spaces = 10;
    bool flag;  // 标记线程结束运行
    pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
    pthread_cond_t notfull = PTHREAD_COND_INITIALIZER;  // 缓冲区不满
    pthread_cond_t notempty = PTHREAD_COND_INITIALIZER;  // 缓冲区不空
    
    void *producer( void *arg ) {
        while( flag ) {
            pthread_mutex_lock( &mutex );  // 为保证条件变量不会因为多线程混乱,所以先加锁
            while( !spaces ) {  // 避免“惊群”效应,避免因其他线程实现得到事件而导致该线程“假醒”
                pthread_cond_wait( &notfull, &mutex );
            }
            buf[in] = current++;
            in = ( in + 1 ) % 10;
            items++;
            spaces--;
    
            printf( "producer %zu , current = %d\n", pthread_self(), current );
            for( int i = 0; i < 10; i++ ) {
                printf( "%-4d", buf[i] );
            }
            printf( "\n\n" );
    
            pthread_cond_signal( &notempty );
            pthread_mutex_unlock( &mutex );
        }
        pthread_exit( NULL );
    }
    
    void *consumer( void *arg ) {
        while( flag ) {
            pthread_mutex_lock( &mutex );
            while( !items ) {
                pthread_cond_wait( &notempty, &mutex );
            }
            buf[out] = -1;
            out = ( out + 1 ) % 10;
            current--;
            items--;
            spaces++;
    
            printf( "consumer %zu , current = %d\n", pthread_self(), current );
            for( int i = 0; i < 10; i++ ) {
                printf( "%-4d", buf[i] );
            }
            printf( "\n\n" );
    
            pthread_cond_signal( &notfull );
            pthread_mutex_unlock( &mutex );
        }
        pthread_exit( NULL );
    }
    
    int main() {
        memset( buf, -1, sizeof(buf) );
        flag = true;
        pthread_t pro[10], con[10];
        int i = 0;
    
        for( int i = 0; i < 10; i++ ) {
            pthread_create( &pro[i], NULL, producer, NULL );
            pthread_create( &con[i], NULL, consumer, NULL );
        }
    
        sleep(1);  // 让线程运行一秒
        flag = false;
    
        for( int i = 0; i < 10; i++ ) {
            pthread_join( pro[i], NULL );
            pthread_join( con[i], NULL );
        }
    
        return 0;
    } 
    

    五、 互斥锁与条件变量的使用比较

    我们会发现,在伪代码中强调了条件变量在前,互斥锁在后,而到了代码实现时又变成了先加互斥锁,再进行循环pthread_cond_wait()。这不是自相矛盾吗?

    其实,在伪代码中的wait()signal()就是操作系统中的PV操作,而PV操作定义就保证了该语句是原子操作,因此在wait条件变量改变的时候不会因为多进程同时访问共享资源造成混乱,所以为了保证线程间的同步,需要先加条件变量,等事件可使用后才进行线程相应的操作,此时互斥锁的作用是保证共享资源不会被其他线程访问。

    而在代码实现中,signal()对应的时pthread_cond_wait()函数,该函数在执行时会有三步:

    • 解开当前的锁
    • 等待条件变量达到所需要的状态
    • 再把之前解开的锁加锁

    为了实现将pthread_cond_wait()变成原子操作,就需要在该函数之前添加互斥锁。因为pthread_cond_wait()可以解锁,也就不会发生像伪代码所说的死锁问题。相反,如果像伪代码那样先使用条件变量,后加锁,则会造成多个线程同时访问共享资源的问题,造成数据的混乱。


    欢迎关注微信公众号,不定时分享学习资料与学习笔记,感谢!
    在这里插入图片描述

    展开全文
  • Java多种方式解决生产者消费者问题(十分详细)

    万次阅读 多人点赞 2018-08-16 08:40:50
    生产者消费者问题 一、问题描述 生产者消费者问题(Producer-consumer problem),也称有限缓冲问题(Bounded-buffer problem),是一个多线程同步问题的经典案例。生产者生成一定量的数据放到缓冲区中,...

    一、问题描述

    生产者消费者问题(Producer-consumer problem),也称有限缓冲问题(Bounded-buffer problem),是一个多线程同步问题的经典案例。生产者生成一定量的数据放到缓冲区中,然后重复此过程;与此同时,消费者也在缓冲区消耗这些数据。生产者和消费者之间必须保持同步,要保证生产者不会在缓冲区满时放入数据,消费者也不会在缓冲区空时消耗数据。不够完善的解决方法容易出现死锁的情况,此时进程都在等待唤醒。

    示意图:
    生产者消费者


    二、解决方法

    思路

    1. 采用某种机制保护生产者和消费者之间的同步。有较高的效率,并且易于实现,代码的可控制性较好,属于常用的模式。

    2. 在生产者和消费者之间建立一个管道。管道缓冲区不易控制,被传输数据对象不易于封装等,实用性不强。

    解决问题的核心

       保证同一资源被多个线程并发访问时的完整性。常用的同步方法是采用信号或加锁机制,保证资源在任意时刻至多被一个线程访问。

    Java能实现的几种方法

    1. wait() / notify()方法

    2. await() / signal()方法

    3. BlockingQueue阻塞队列方法

    4. 信号量

    5. 管道


    三、代码实现

    1. wait() / notify()方法

    当缓冲区已满时,生产者线程停止执行,放弃锁,使自己处于等状态,让其他线程执行;
    当缓冲区已空时,消费者线程停止执行,放弃锁,使自己处于等状态,让其他线程执行。

    当生产者向缓冲区放入一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态;
    当消费者从缓冲区取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。

    仓库Storage.java

    import java.util.LinkedList;
    
    public class Storage {
    
        // 仓库容量
        private final int MAX_SIZE = 10;
        // 仓库存储的载体
        private LinkedList<Object> list = new LinkedList<>();
    
        public void produce() {
            synchronized (list) {
                while (list.size() + 1 > MAX_SIZE) {
                    System.out.println("【生产者" + Thread.currentThread().getName()
    		                + "】仓库已满");
                    try {
                        list.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                list.add(new Object());
                System.out.println("【生产者" + Thread.currentThread().getName()
                        + "】生产一个产品,现库存" + list.size());
                list.notifyAll();
            }
        }
    
        public void consume() {
            synchronized (list) {
                while (list.size() == 0) {
                    System.out.println("【消费者" + Thread.currentThread().getName() 
    						+ "】仓库为空");
                    try {
                        list.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                list.remove();
                System.out.println("【消费者" + Thread.currentThread().getName()
                        + "】消费一个产品,现库存" + list.size());
                list.notifyAll();
            }
        }
    }
    

    生产者

    public class Producer implements Runnable{
        private Storage storage;
    
        public Producer(){}
    
        public Producer(Storage storage){
            this.storage = storage;
        }
    
        @Override
        public void run(){
            while(true){
                try{
                    Thread.sleep(1000);
                    storage.produce();
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
        }
    }
    

    消费者

    public class Consumer implements Runnable{
        private Storage storage;
    
        public Consumer(){}
    
        public Consumer(Storage storage){
            this.storage = storage;
        }
    
        @Override
        public void run(){
            while(true){
                try{
                    Thread.sleep(3000);
                    storage.consume();
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
        }
    }
    

    主函数

    public class Main {
    
        public static void main(String[] args) {
            Storage storage = new Storage();
            Thread p1 = new Thread(new Producer(storage));
            Thread p2 = new Thread(new Producer(storage));
            Thread p3 = new Thread(new Producer(storage));
    
            Thread c1 = new Thread(new Consumer(storage));
            Thread c2 = new Thread(new Consumer(storage));
            Thread c3 = new Thread(new Consumer(storage));
    
            p1.start();
            p2.start();
            p3.start();
            c1.start();
            c2.start();
            c3.start();
        }
    }
    

    运行结果

    【生产者p1】生产一个产品,现库存1
    【生产者p2】生产一个产品,现库存2
    【生产者p3】生产一个产品,现库存3
    【生产者p1】生产一个产品,现库存4
    【生产者p2】生产一个产品,现库存5
    【生产者p3】生产一个产品,现库存6
    【生产者p1】生产一个产品,现库存7
    【生产者p2】生产一个产品,现库存8
    【消费者c1】消费一个产品,现库存7
    【生产者p3】生产一个产品,现库存8
    【消费者c2】消费一个产品,现库存7
    【消费者c3】消费一个产品,现库存6
    【生产者p1】生产一个产品,现库存7
    【生产者p2】生产一个产品,现库存8
    【生产者p3】生产一个产品,现库存9
    【生产者p1】生产一个产品,现库存10
    【生产者p2】仓库已满
    【生产者p3】仓库已满
    【生产者p1】仓库已满
    【消费者c1】消费一个产品,现库存9
    【生产者p1】生产一个产品,现库存10
    【生产者p3】仓库已满
    。。。。。。以下省略
    

    一个生产者线程运行produce方法,睡眠1s;一个消费者运行一次consume方法,睡眠3s。此次实验过程中,有3个生产者和3个消费者,也就是我们说的多对多的情况。仓库的容量为10,可以看出消费的速度明显慢于生产的速度,符合设定。

    注意:

    notifyAll()方法可使所有正在等待队列中等待同一共享资源的“全部”线程从等待状态退出,进入可运行状态。此时,优先级最高的哪个线程最先执行,但也有可能是随机执行的,这要取决于JVM虚拟机的实现。即最终也只有一个线程能被运行,上述线程优先级都相同,每次运行的线程都不确定是哪个,后来给线程设置优先级后也跟预期不一样,还是要看JVM的具体实现吧。

    2. await() / signal()方法

    在JDK5中,用ReentrantLock和Condition可以实现等待/通知模型,具有更大的灵活性。通过在Lock对象上调用newCondition()方法,将条件变量和一个锁对象进行绑定,进而控制并发程序访问竞争资源的安全。

    在这里只需改动Storage类

    import java.util.LinkedList;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class Storage {
    
        // 仓库最大存储量
        private final int MAX_SIZE = 10;
        // 仓库存储的载体
        private LinkedList<Object> list = new LinkedList<Object>();
        // 锁
        private final Lock lock = new ReentrantLock();
        // 仓库满的条件变量
        private final Condition full = lock.newCondition();
        // 仓库空的条件变量
        private final Condition empty = lock.newCondition();
    
        public void produce()
        {
            // 获得锁
            lock.lock();
            while (list.size() + 1 > MAX_SIZE) {
                System.out.println("【生产者" + Thread.currentThread().getName()
    		             + "】仓库已满");
                try {
                    full.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            list.add(new Object());
            System.out.println("【生产者" + Thread.currentThread().getName() 
    				 + "】生产一个产品,现库存" + list.size());
    
            empty.signalAll();
            lock.unlock();
        }
    
        public void consume()
        {
            // 获得锁
            lock.lock();
            while (list.size() == 0) {
                System.out.println("【消费者" + Thread.currentThread().getName()
    		             + "】仓库为空");
                try {
                    empty.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            list.remove();
            System.out.println("【消费者" + Thread.currentThread().getName()
    		         + "】消费一个产品,现库存" + list.size());
    
            full.signalAll();
            lock.unlock();
        }
    }
    

    运行结果与wait()/notify()类似

    3. BlockingQueue阻塞队列方法

    BlockingQueue是JDK5.0的新增内容,它是一个已经在内部实现了同步的队列,实现方式采用的是我们第2种await() / signal()方法。它可以在生成对象时指定容量大小,用于阻塞操作的是put()和take()方法。

    put()方法:类似于我们上面的生产者线程,容量达到最大时,自动阻塞。
    take()方法:类似于我们上面的消费者线程,容量为0时,自动阻塞。

    import java.util.concurrent.LinkedBlockingQueue;
    
    public class Storage {
    
        // 仓库存储的载体
        private LinkedBlockingQueue<Object> list = new LinkedBlockingQueue<>(10);
    
        public void produce() {
            try{
                list.put(new Object());
                System.out.println("【生产者" + Thread.currentThread().getName()
                        + "】生产一个产品,现库存" + list.size());
            } catch (InterruptedException e){
                e.printStackTrace();
            }
        }
    
        public void consume() {
            try{
                list.take();
                System.out.println("【消费者" + Thread.currentThread().getName()
                        + "】消费了一个产品,现库存" + list.size());
            } catch (InterruptedException e){
                e.printStackTrace();
            }
        }
    }
    

    可能会出现put()或take()和System.out.println()输出不匹配的情况,是由于它们之间没有同步造成的。BlockingQueue可以放心使用,这可不是它的问题,只是在它和别的对象之间的同步有问题。

    4. 信号量

    Semaphore是一种基于计数的信号量。它可以设定一个阈值,基于此,多个线程竞争获取许可信号,做完自己的申请后归还,超过阈值后,线程申请许可信号将会被阻塞。Semaphore可以用来构建一些对象池,资源池之类的,比如数据库连接池,我们也可以创建计数为1的Semaphore,将其作为一种类似互斥锁的机制,这也叫二元信号量,表示两种互斥状态。计数为0的Semaphore是可以release的,然后就可以acquire(即一开始使线程阻塞从而完成其他执行。)。

    import java.util.LinkedList;
    import java.util.concurrent.Semaphore;
    
    public class Storage {
    
        // 仓库存储的载体
        private LinkedList<Object> list = new LinkedList<Object>();
    	// 仓库的最大容量
        final Semaphore notFull = new Semaphore(10);
        // 将线程挂起,等待其他来触发
        final Semaphore notEmpty = new Semaphore(0);
        // 互斥锁
        final Semaphore mutex = new Semaphore(1);
    
        public void produce()
        {
            try {
                notFull.acquire();
                mutex.acquire();
                list.add(new Object());
                System.out.println("【生产者" + Thread.currentThread().getName()
                        + "】生产一个产品,现库存" + list.size());
            }
            catch (Exception e) {
                e.printStackTrace();
            } finally {
                mutex.release();
                notEmpty.release();
            }
        }
    
        public void consume()
        {
            try {
                notEmpty.acquire();
                mutex.acquire();
                list.remove();
                System.out.println("【消费者" + Thread.currentThread().getName()
                        + "】消费一个产品,现库存" + list.size());
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                mutex.release();
                notFull.release();
            }
        }
    }
    

    5. 管道

    一种特殊的流,用于不同线程间直接传送数据,一个线程发送数据到输出管道,另一个线程从输入管道中读数据。

    inputStream.connect(outputStream)或outputStream.connect(inputStream)作用是使两个Stream之间产生通信链接,这样才可以将数据进行输出与输入。

    这种方式只适用于两个线程之间通信,不适合多个线程之间通信。

    1. PipedInputStream / PipedOutputStream (操作字节流)

    Producer

    import java.io.IOException;
    import java.io.PipedOutputStream;
    
    public class Producer implements Runnable {
    
        private PipedOutputStream pipedOutputStream;
    
        public Producer() {
            pipedOutputStream = new PipedOutputStream();
        }
    
        public PipedOutputStream getPipedOutputStream() {
            return pipedOutputStream;
        }
    
        @Override
        public void run() {
            try {
                for (int i = 1; i <= 5; i++) {
                    pipedOutputStream.write(("This is a test, Id=" + i + "!\n").getBytes());
                }
                pipedOutputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    

    Consumer

    import java.io.IOException;
    import java.io.PipedInputStream;
    
    public class Consumer implements Runnable {
        private PipedInputStream pipedInputStream;
    
        public Consumer() {
            pipedInputStream = new PipedInputStream();
        }
    
        public PipedInputStream getPipedInputStream() {
            return pipedInputStream;
        }
    
        @Override
        public void run() {
            int len = -1;
            byte[] buffer = new byte[1024];
            try {
                while ((len = pipedInputStream.read(buffer)) != -1) {
                    System.out.println(new String(buffer, 0, len));
                }
                pipedInputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    

    主函数

    import java.io.IOException;
    
    public class Main {
    
        public static void main(String[] args) {
            Producer p = new Producer();
            Consumer c = new Consumer();
            Thread t1 = new Thread(p);
            Thread t2 = new Thread(c);
            try {
                p.getPipedOutputStream().connect(c.getPipedInputStream());
                t2.start();
                t1.start();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    2. PipedReader / PipedWriter (操作字符流)

    Producer

    import java.io.IOException;
    import java.io.PipedWriter;
    
    public class Producer implements Runnable {
    
        private PipedWriter pipedWriter;
    
        public Producer() {
            pipedWriter = new PipedWriter();
        }
    
        public PipedWriter getPipedWriter() {
            return pipedWriter;
        }
    
        @Override
        public void run() {
            try {
                for (int i = 1; i <= 5; i++) {
                    pipedWriter.write("This is a test, Id=" + i + "!\n");
                }
                pipedWriter.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    

    Consumer

    import java.io.IOException;
    import java.io.PipedReader;
    
    public class Consumer implements Runnable {
        private PipedReader pipedReader;
    
        public Consumer() {
            pipedReader = new PipedReader();
        }
    
        public PipedReader getPipedReader() {
            return pipedReader;
        }
    
        @Override
        public void run() {
            int len = -1;
            char[] buffer = new char[1024];
            try {
                while ((len = pipedReader.read(buffer)) != -1) {
                    System.out.println(new String(buffer, 0, len));
                }
                pipedReader.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    

    主函数

    import java.io.IOException;
    
    public class Main {
    
        public static void main(String[] args) {
            Producer p = new Producer();
            Consumer c = new Consumer();
            Thread t1 = new Thread(p);
            Thread t2 = new Thread(c);
            try {
                p.getPipedWriter().connect(c.getPipedReader());
                t2.start();
                t1.start();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    

    想查看上面几种方式的完整代码,请点击这里:生产者消费者问题的一些实验


    参考

    《Java多线程编程核心技术》 高洪岩
    生产者/消费者问题的多种Java实现方式
    Producer–consumer problem – Wikipedia
    Semaphore的一种使用方法
    Semaphore实现的生产者消费者程序

    展开全文
  • 秒杀多线程第十篇 生产者消费者问题

    万次阅读 多人点赞 2012-05-21 10:18:09
    继经典线程同步问题之后,我们来看看生产者消费者问题及读者写者问题。生产者消费者问题是一个著名的线程同步问题,该问题描述如下:有一个生产者在生产产品,这些产品将提供给若干个消费者去消费,为了使生产者和...

        继经典线程同步问题之后,我们来看看生产者消费者问题及读者写者问题。生产者消费者问题是一个著名的线程同步问题,该问题描述如下:有一个生产者在生产产品,这些产品将提供给若干个消费者去消费,为了使生产者和消费者能并发执行,在两者之间设置一个具有多个缓冲区的缓冲池,生产者将它生产的产品放入一个缓冲区中,消费者可以从缓冲区中取走产品进行消费,显然生产者和消费者之间必须保持同步,即不允许消费者到一个空的缓冲区中取产品,也不允许生产者向一个已经放入产品的缓冲区中再次投放产品。

        这个生产者消费者题目不仅常用于操作系统的课程设计,也常常在程序员和软件设计师考试中出现。并且在计算机考研的专业课考试中也是一个非常热门的问题。因此现在就针对这个问题进行详细深入的解答。

     

        首先来简化问题,先假设生产者和消费者都只有一个,且缓冲区也只有一个。这样情况就简便多了。

        第一.从缓冲区取出产品和向缓冲区投放产品必须是互斥进行的。可以用关键段互斥量来完成。

        第二.生产者要等待缓冲区为空,这样才可以投放产品,消费者要等待缓冲区不为空,这样才可以取出产品进行消费。并且由于有二个等待过程,所以要用二个事件信号量来控制。

        考虑这二点后,代码很容易写出来。另外为了美观起见,将消费者的输出颜色设置为彩色,有关如何在控制台下设置彩色输出请参阅《VC 控制台颜色设置》。

    //1生产者 1消费者 1缓冲区
    //使用二个事件,一个表示缓冲区空,一个表示缓冲区满。
    //再使用一个关键段来控制缓冲区的访问
    #include <stdio.h>
    #include <process.h>
    #include <windows.h>
    //设置控制台输出颜色
    BOOL SetConsoleColor(WORD wAttributes)
    {
    	HANDLE hConsole = GetStdHandle(STD_OUTPUT_HANDLE);
    	if (hConsole == INVALID_HANDLE_VALUE)
    		return FALSE;	
    	return SetConsoleTextAttribute(hConsole, wAttributes);
    }
    const int END_PRODUCE_NUMBER = 10;   //生产产品个数
    int g_Buffer;                        //缓冲区
    //事件与关键段
    CRITICAL_SECTION g_cs;
    HANDLE g_hEventBufferEmpty, g_hEventBufferFull;
    //生产者线程函数
    unsigned int __stdcall ProducerThreadFun(PVOID pM)
    {
    	for (int i = 1; i <= END_PRODUCE_NUMBER; i++)
    	{
    		//等待缓冲区为空
    		WaitForSingleObject(g_hEventBufferEmpty, INFINITE);
    
    		//互斥的访问缓冲区
    		EnterCriticalSection(&g_cs);
    		g_Buffer = i;
    		printf("生产者将数据%d放入缓冲区\n", i);
    		LeaveCriticalSection(&g_cs);
    		
    		//通知缓冲区有新数据了
    		SetEvent(g_hEventBufferFull);
    	}
    	return 0;
    }
    //消费者线程函数
    unsigned int __stdcall ConsumerThreadFun(PVOID pM)
    {
    	volatile bool flag = true;
    	while (flag)
    	{
    		//等待缓冲区中有数据
    		WaitForSingleObject(g_hEventBufferFull, INFINITE);
    		
    		//互斥的访问缓冲区
    		EnterCriticalSection(&g_cs);
    		SetConsoleColor(FOREGROUND_GREEN);
    		printf("  消费者从缓冲区中取数据%d\n", g_Buffer);
    		SetConsoleColor(FOREGROUND_RED | FOREGROUND_GREEN | FOREGROUND_BLUE);
    		if (g_Buffer == END_PRODUCE_NUMBER)
    			flag = false;
    		LeaveCriticalSection(&g_cs);
    		
    		//通知缓冲区已为空
    		SetEvent(g_hEventBufferEmpty);
    
    		Sleep(10); //some other work should to do
    	}
    	return 0;
    }
    int main()
    {
    	printf("  生产者消费者问题   1生产者 1消费者 1缓冲区\n");
    	printf(" -- by MoreWindows( http://blog.csdn.net/MoreWindows ) --\n\n");
    
    	InitializeCriticalSection(&g_cs);
    	//创建二个自动复位事件,一个表示缓冲区是否为空,另一个表示缓冲区是否已经处理
    	g_hEventBufferEmpty = CreateEvent(NULL, FALSE, TRUE, NULL);
    	g_hEventBufferFull = CreateEvent(NULL, FALSE, FALSE, NULL);
    	
    	const int THREADNUM = 2;
    	HANDLE hThread[THREADNUM];
    	
    	hThread[0] = (HANDLE)_beginthreadex(NULL, 0, ProducerThreadFun, NULL, 0, NULL);
    	hThread[1] = (HANDLE)_beginthreadex(NULL, 0, ConsumerThreadFun, NULL, 0, NULL);
    	WaitForMultipleObjects(THREADNUM, hThread, TRUE, INFINITE);
    	CloseHandle(hThread[0]);
    	CloseHandle(hThread[1]);
    	
    	//销毁事件和关键段
    	CloseHandle(g_hEventBufferEmpty);
    	CloseHandle(g_hEventBufferFull);
    	DeleteCriticalSection(&g_cs);
    	return 0;
    }

    运行结果如下所示:

    可以看出生产者与消费者已经是有序的工作了。

     

        然后再对这个简单生产者消费者问题加大难度。将消费者改成2个,缓冲池改成拥有4个缓冲区的大缓冲池。

        如何来思考了这个问题了?首先根据上面分析的二点,可以知道生产者和消费者由一个变成多个的影响不大,唯一要注意的是缓冲池变大了,回顾一下《秒杀多线程第八篇 经典线程同步 信号量Semaphore》中的信号量,不难得出用二个信号量就可以解决这种缓冲池有多个缓冲区的情况——用一个信号量A来记录为空的缓冲区个数,另一个信号量B记录非空的缓冲区个数,然后生产者等待信号量A,消费者等待信号量B就可以了。因此可以仿照上面的代码来实现复杂生产者消费者问题,示例代码如下:

    //1生产者 2消费者 4缓冲区
    #include <stdio.h>
    #include <process.h>
    #include <windows.h>
    //设置控制台输出颜色
    BOOL SetConsoleColor(WORD wAttributes)
    {
    	HANDLE hConsole = GetStdHandle(STD_OUTPUT_HANDLE);
    	if (hConsole == INVALID_HANDLE_VALUE)
    		return FALSE;
    	
    	return SetConsoleTextAttribute(hConsole, wAttributes);
    }
    const int END_PRODUCE_NUMBER = 8;  //生产产品个数
    const int BUFFER_SIZE = 4;          //缓冲区个数
    int g_Buffer[BUFFER_SIZE];          //缓冲池
    int g_i, g_j;
    //信号量与关键段
    CRITICAL_SECTION g_cs;
    HANDLE g_hSemaphoreBufferEmpty, g_hSemaphoreBufferFull;
    //生产者线程函数
    unsigned int __stdcall ProducerThreadFun(PVOID pM)
    {
    	for (int i = 1; i <= END_PRODUCE_NUMBER; i++)
    	{
    		//等待有空的缓冲区出现
    		WaitForSingleObject(g_hSemaphoreBufferEmpty, INFINITE);
    
    		//互斥的访问缓冲区
    		EnterCriticalSection(&g_cs);
    		g_Buffer[g_i] = i;
    		printf("生产者在缓冲池第%d个缓冲区中投放数据%d\n", g_i, g_Buffer[g_i]);
    		g_i = (g_i + 1) % BUFFER_SIZE;
    		LeaveCriticalSection(&g_cs);
    
    		//通知消费者有新数据了
    		ReleaseSemaphore(g_hSemaphoreBufferFull, 1, NULL);
    	}
    	printf("生产者完成任务,线程结束运行\n");
    	return 0;
    }
    //消费者线程函数
    unsigned int __stdcall ConsumerThreadFun(PVOID pM)
    {
    	while (true)
    	{
    		//等待非空的缓冲区出现
    		WaitForSingleObject(g_hSemaphoreBufferFull, INFINITE);
    		
    		//互斥的访问缓冲区
    		EnterCriticalSection(&g_cs);
    		SetConsoleColor(FOREGROUND_GREEN);
    		printf("  编号为%d的消费者从缓冲池中第%d个缓冲区取出数据%d\n", GetCurrentThreadId(), g_j, g_Buffer[g_j]);
    		SetConsoleColor(FOREGROUND_RED | FOREGROUND_GREEN | FOREGROUND_BLUE);
    		if (g_Buffer[g_j] == END_PRODUCE_NUMBER)//结束标志
    		{
    			LeaveCriticalSection(&g_cs);
    			//通知其它消费者有新数据了(结束标志)
    			ReleaseSemaphore(g_hSemaphoreBufferFull, 1, NULL);
    			break;
    		}
    		g_j = (g_j + 1) % BUFFER_SIZE;
    		LeaveCriticalSection(&g_cs);
    
    		Sleep(50); //some other work to do
    
    		ReleaseSemaphore(g_hSemaphoreBufferEmpty, 1, NULL);
    	}
    	SetConsoleColor(FOREGROUND_GREEN);
    	printf("  编号为%d的消费者收到通知,线程结束运行\n", GetCurrentThreadId());
    	SetConsoleColor(FOREGROUND_RED | FOREGROUND_GREEN | FOREGROUND_BLUE);
    	return 0;
    }
    int main()
    {
    	printf("  生产者消费者问题   1生产者 2消费者 4缓冲区\n");
    	printf(" -- by MoreWindows( http://blog.csdn.net/MoreWindows ) --\n\n");
    
    	InitializeCriticalSection(&g_cs);
    	//初始化信号量,一个记录有产品的缓冲区个数,另一个记录空缓冲区个数.
    	g_hSemaphoreBufferEmpty = CreateSemaphore(NULL, 4, 4, NULL);
    	g_hSemaphoreBufferFull  = CreateSemaphore(NULL, 0, 4, NULL);
    	g_i = 0;
    	g_j = 0;
    	memset(g_Buffer, 0, sizeof(g_Buffer));
    
    	const int THREADNUM = 3;
    	HANDLE hThread[THREADNUM];
    	//生产者线程
    	hThread[0] = (HANDLE)_beginthreadex(NULL, 0, ProducerThreadFun, NULL, 0, NULL);
    	//消费者线程
    	hThread[1] = (HANDLE)_beginthreadex(NULL, 0, ConsumerThreadFun, NULL, 0, NULL);
    	hThread[2] = (HANDLE)_beginthreadex(NULL, 0, ConsumerThreadFun, NULL, 0, NULL);
    	WaitForMultipleObjects(THREADNUM, hThread, TRUE, INFINITE);
    	for (int i = 0; i < THREADNUM; i++)
    		CloseHandle(hThread[i]);
    
    	//销毁信号量和关键段
    	CloseHandle(g_hSemaphoreBufferEmpty);
    	CloseHandle(g_hSemaphoreBufferFull);
    	DeleteCriticalSection(&g_cs);
    	return 0;
    }

    运行结果如下图所示:

    输出结果证明各线程的同步和互斥已经完成了。

     

    至此,生产者消费者问题已经圆满的解决了,下面作个总结:

    1.首先要考虑生产者与消费者对缓冲区操作时的互斥。

    2.不管生产者与消费者有多少个,缓冲池有多少个缓冲区。都只有二个同步过程——分别是生产者要等待有空缓冲区才能投放产品,消费者要等待有非空缓冲区才能去取产品。

     

    下一篇《秒杀多线程第十一篇读者写者问题》将介绍另一个著名的同步问题——读者写者问题,欢迎大家再来参阅。

     

    转载请标明出处,原文地址:http://blog.csdn.net/morewindows/article/details/7577591

    如果觉得本文对您有帮助,请点击‘顶’支持一下,您的支持是我写作最大的动力,谢谢。

     

    展开全文
  • 信号量与生产者消费者问题

    万次阅读 多人点赞 2017-01-19 15:06:44
    生产者—消费者问题 生产者—消费者题型在各类考试(考研、程序员证书、程序员面试笔试、期末考试)很常见,原因之一是生产者...生产者—消费者题型最基本的是有界缓冲区的生产者消费者问题和无界缓冲区的生产者消费者

    生产者—消费者问题

            生产者—消费者题型在各类考试(考研、程序员证书、程序员面试笔试、期末考试)很常见,原因之一是生产者—消费者题型在实际的并发程序(多进程、多线程)设计中很常见;之二是这种题型综合性较好,涉及进程合作、互斥,有时还涉及死锁的避免。生产者—消费者题型可以全面考核你对并发程序的理解和设计能力。

            生产者—消费者题型最基本的是有界缓冲区的生产者消费者问题和无界缓冲区的生产者消费者问题,对这两个问题的解我们应该掌握其解决方案。

            对于有界缓冲区的生产者—消费者问题,两个进程共享一个公共的固定大小的缓冲区。其中一个是生产者,将信息放入缓冲区;另一个是消费者,从缓冲区中取出信息(也可以把这个问题一般化为m个生产者和n个消费者问题,但是我们只讨论一个生产者和一个消费者的情况,这样可以简化解决方案)。

     

            问题在于当缓冲区已满,而此时生产者还想向其中放入一个新的数据项的情况,其解决办法是让生产者睡眠,待消费者从缓冲区中取出一个或多个数据项时再唤醒它。同样地,当消费者试图从缓冲区取数据而发现缓冲区为空时,消费者就睡眠,直到生产者向其中放入一些数据时再将其唤醒。

            这个方法听起来很简单,为了跟踪缓冲区中的数据项数,我们需要一个变量count。如果缓冲区最多存放N个数据项,则生产者代码将首先检查count是否达到N,若是,则生产者睡眠,否则生产者向缓冲区最多存放N个数据项,则生产者代码将首先检查count是否达到N,若是,则生产者睡眠;否则生产者向缓冲区放入一个数据项并增量count的值。

            消费者的代码与此类似:首先测试count是否为0,若是,则睡眠;否则从中取出一个数据项并递减count的值。每个进程同时也检测另一个进程是否应该被唤醒,若是则唤醒之。生产者消费者的代码如下:

    #define N 100
    int count = 0;
    void producer(void)
    {
    	int item;
    	while(TRUE)
    	{
    		item = produce_item();
    		if(count == N)					//如果缓冲区满就休眠
    		sleep();
    		insert_item(item);
    		count = count + 1;				//缓冲区数据项计数加1
    		if(count == 1)
    		wakeup(consumer);
    	}
    }
    
    void consumer(void)
    {
    	int item;
    	while(TRUE)
    	{
    		if(count == 0)				//如果缓冲区空就休眠
    			sleep();
    		item = remove_item();
    		count = count - 1;			//缓冲区数据项计数减1
    		if(count == N - 1)
    			wakeup(producer);
    		consume_item(item);
    	}
    }

            这里有可能出现竞争条件,其原因是对count的访问未作限制。有可能出现以下情况:缓冲区为空,消费者刚刚读取count的值发现它为0,此时调度程序决定暂停消费者并启动运行生产者(进程切换)。生产者向缓冲区加入一个数据项,count加1。现在count的值变成了1,它推断认为count刚才为0,所以消费者此时一定在睡眠,于是生产者调用wakeup来唤醒消费者。

            但是消费者在逻辑上并未睡眠,所以wakeup信号丢失,当消费者下次运行时,它将测试先前读取的count值,发现它为0。于是睡眠,生产者迟早会填满整个缓冲区,然后睡眠,这样一来,两个进程将永远睡眠下去。

    信号量的引入及其操作

            信号量是Dijkstra在1965年提出的一种方法,它使用一个整型变量来累计唤醒次数,供以后使用。在他的建议中引入了一个新的变量类型,称作信号量(semaphore)。一个信号量的取值可以为0(表示没有保存下来的唤醒操作)或者正值(表示有一个或多个唤醒操作)。

            Dijkstra建议设立两种操作:down和up(分别为一般化后的sleep和wakeup)。对一个信号量执行down操作,则是检查其值是否大于0。若该值大于0,则将其减1(即用掉一个保存的唤醒信号)并继续;若该值为0,则进程将睡眠,而且此时down操作并未结束检查数值、修改变量值以及可能发生的睡眠操作均作为一个单一的、不可分割的原子操作完成。保证一旦一个信号量操作开始,则在该操作完成或阻塞之前,其他进程均不允许访问该信号量。这种原子性对于解决同步问题和避免竞争条件是绝对必要的。所谓原子操作,是指一组相关联的操作要么都不间断地执行,要么不执行

            up操作对信号量的值增1。如果一个或多个进程在该信号量上睡眠,无法完成一个先前的down操作,则由系统选择其中的一个(如随机挑选)并允许该进程完成它的down操作。于是,对一个有进程在其上睡眠的信号量执行一次up操作后,该信号量的值仍旧是0,但在其上睡眠的进程却少了一个。信号量的值增加1和唤醒一个进程同样也是不可分割的,不会有某个进程因执行up而阻塞,正如前面的模型中不会有进程因执行wakeup而阻塞一样。

            在Dijkstra原来的论文中,他分别使用名称P和V而不是down和up,荷兰语中,Proberen的意思是尝试,Verhogen的含义是增加或升高。

            从物理上说明信号量的P、V操作的含义。 P(S)表示申请一个资源,S.value>0表示有资源可用,其值为资源的数目;S.value=0表示无资源可用;S.value<0, 则|S.value|表示S等待队列中的进程个数。V(S)表示释放一个资源,信号量的初值应该大于等于0。P操作相当于“等待一个信号”,而V操作相当于“发送一个信号”,在实现同步过程中,V操作相当于发送一个信号说合作者已经完成了某项任务,在实现互斥过程中,V操作相当于发送一个信号说临界资源可用了。实际上,在实现互斥时,P、V操作相当于申请资源和释放资源

            该解决方案使用了三个信号量:一个称为full,用来记录充满缓冲槽数目,一个称为empty,记录空的缓冲槽总数;一个称为mutex,用来确保生产者和消费者不会同时访问缓冲区。full的初值为0,empty的初值为缓冲区中槽的数目,mutex的初值为1。供两个或多个进程使用的信号量,其初值为1,保证同时只有一个进程可以进入临界区,称作二元信号量。如果每个进程在进入临界区前都执行down操作,并在刚刚退出时执行一个up操作,就能够实现互斥。

            在下面的例子中,我们实际上是通过两种不同的方式来使用信号量,两者之间的区别是很重要的,信号量mutex用于互斥,它用于保证任一时刻只有一个进程读写缓冲区和相关的变量。互斥是避免混乱所必需的操作。

    #define N 100
    typedef int semaphore;
    semaphore mutex = 1;
    semaphore empty = N;
    semaphore full = 0;
    void producer(void)
    {
    	int item;
    	while(TRUE)
    	{
    		item = produce_item();
    		down(&empty);				//空槽数目减1,相当于P(empty)
    		down(&mutex);				//进入临界区,相当于P(mutex)
    		insert_item(item);			//将新数据放到缓冲区中
    		up(&mutex);				//离开临界区,相当于V(mutex)
    		up(&full);				//满槽数目加1,相当于V(full)
    	}
    }
    void consumer(void)
    {
    	int item;
    	while(TRUE)
    	{
    		down(&full);				//将满槽数目减1,相当于P(full)
    		down(&mutex);				//进入临界区,相当于P(mutex)
    		item = remove_item();	   		 //从缓冲区中取出数据
    		up(&mutex);				//离开临界区,相当于V(mutex)		
    		up(&empty);				//将空槽数目加1 ,相当于V(empty)
    		consume_item(item);			//处理取出的数据项
    	}
    }

     

            信号量的另一种用途是用于实现同步,信号量full和empty用来保证某种事件的顺序发生或不发生。在本例中,它们保证当缓冲区满的时候生产者停止运行,以及当缓冲区空的时候消费者停止运行。

            对于无界缓冲区的生产者—消费者问题,两个进程共享一个不限大小的公共缓冲区。由于是无界缓冲区(仓库是无界限制的),即生产者不用关心仓库是否满,只管往里面生产东西,但是消费者还是要关心仓库是否空。所以生产者不会因得不到缓冲区而被阻塞,不需要对空缓冲区进行管理,可以去掉在有界缓冲区中用来管理空缓冲区的信号量及其PV操作。

    Semaphore mutex = 1; 
    Semaphore full = 0; 
    int in = 0,out = 0;
    void producer(void)
    {
    	while(TRUE)
    	{
    		item = produce_item();
    		P(mutex);				//进入临界区
    		Buffer(in) = item;			//新生产的数据项放入缓冲区
    		in = in + 1;				//因无界,无需考虑输入指针越界
    		V(mutex);				//离开临界区
    		V(full);				//增加已用缓冲区的数目
    	}
    }
    void consumer(void)
    {
    	int item;
    	while(TRUE)
    	{
    		P(full);			//等待已用缓冲区的数目非0
    		P(mutex);			//进入临界区
    		item = Buffer(out);		//新生产的数据项放入缓冲区
    		out = out + 1;			//因无界,无需考虑输出指针越界
    		V(mutex);			//离开临界区
    		consume_item(item);		//处理取出的数据项
    	}
    }

            在计算机领域,同步就是指一个进程在执行某个请求的时候,若该请求需要一段时间才能返回信息,那么这个进程会一直等待下去。直到收到返回信息才继续执行下去。异步是指进程不需要一直等待下去,而是继续执行下面的操作,不管其他进程的状态,当有消息返回时,系统会通知进程进行处理,这样可以提高效率。

    进程同步与互斥

            在操作系统中,进程是占有资源的最小单位(线程可以访问其所在进程内的所有资源,但线程本身并不占有资源或仅仅占有一点必须资源)。但对于某些资源来说,其在同一时间只能被一个进程所占用。这些一次只能被一个进程所占用的资源就是所谓的临界资源。典型的临界资源比如物理上的打印机,或是存在硬盘或内存中被多个进程所共享的一些变量和数据等(如果这类资源不被看成临界资源加以保护,那么很有可能造成丢数据的问题)。

            对临界资源的访问,必须是互斥地进行。也就是当临界资源被占用时,另一个申请临界资源的进程会被阻塞,直到其所申请的临界资源被释放。而进程内访问临界资源的代码被成为临界区。

            进程同步也是进程之间直接的制约关系,是为完成某种任务而建立的两个或多个进程,这些进程需要在某些位置上协调他们的工作次序而等待、传递信息所产生的制约关系。进程间的直接制约关系来源于他们之间的合作。比如说进程A需要从缓冲区读取进程B产生的信息,当缓冲区为空时,进程B因为读取不到信息而被阻塞。而当进程A产生信息放入缓冲区时,进程B才会被唤醒

     

           进程互斥是进程之间的间接制约关系。当一个进程进入临界区使用临界资源时,另一个进程必须等待。只有当使用临界资源的进程退出临界区后,这个进程才会解除阻塞状态。比如进程B需要访问打印机,但此时进程A占有了打印机,进程B会被阻塞,直到进程A释放了打印机资源,进程B才可以继续执行,概念如下图所示。

     

           进程的同步和互斥是指进程在推进时的相互制约关系。 进程同步源于进程合作,是进程间共同完成一项任务是直接发生相互作用的关系进程互斥源于对临界资源的竞争,是进程之间的间接制约关系。

           实现临界区互斥访问的基本方法有硬件实现方法和信号量方法。

           通过硬件实现临界区最简单的办法就是关CPU的中断。从计算机原理我们知道,CPU进行进程切换是需要通过中断来进行。如果屏蔽了中断那么就可以保证当前进程顺利的将临界区代码执行完,从而实现了互斥。这个办法的步骤就是:屏蔽中断—执行临界区操作—开中断。但这样做并不好,这大大限制了处理器交替执行任务的能力。并且将关中断的权限交给用户代码,那么如果用户代码屏蔽了中断后不再开,那系统岂不是跪了?

           信号量实现方式,这也是我们比较熟悉P/V操作。通过设置一个表示资源个数的信号量S,通过对信号量S的P和V操作来实现进程的的互斥。P/V操作是操作系统的原语,意味着具有原子性。P操作首先减少信号量S,表示有一个进程将占用或等待资源,然后检测S是否小于0,如果小于0则阻塞,如果大于0则占有资源进行执行。V操作是和P操作相反的操作,首先增加信号量S,表示占用或等待资源的进程减少了1,然后检测S是否小于0,如果大于0则唤醒等待使用S资源的其它进程。前面的生产者—消费者问题就是典型的应用信号量解决的进程同步问题。

    展开全文
  • 生产者消费者问题 C++实现

    万次阅读 2018-06-29 17:00:03
    生产者消费者问题 C++实现 知识准备 thread 介绍 成员类 成员函数 sleep_for 介绍 mutex 介绍 成员函数 unique_lock 介绍 成员函数 codition_variable 介绍 成员函数 代码示例 生产者消费者问题 ...
  • 生产者消费者模式

    千次阅读 2018-09-06 23:21:22
    生产者消费者模式
  • 生产者消费者模型

    千次阅读 多人点赞 2017-02-20 23:33:26
    一、什么是生产者消费者模型 在实际的开发中,经常会碰到如下场景:某个模块负责生产数据,这些数据由另一个模块来负责处理。产生数据的模块就形象的称为生产者,而处理数据的模块就称为消费者。只有生产者和消费...
  • java 生产者消费者模式

    万次阅读 2017-04-27 15:27:01
    java的生产者消费者模式,有三个部分组成,一个是生产者,一个是消费者,一个是缓存。 这么做有什么好处呢? 1.解耦(去依赖),如果是消费者直接调用生产者,那如果生产者的代码变动了,消费者的代码也需要随之变动 ...
  • 生产者消费者问题

    千次阅读 2019-09-03 23:19:34
    临界区:对临界资源进行访问的那段代码称为临界区。...使用信号量实现生产者消费者问题 使用一个容器来存放物品,只有当容器中还有空位置时,生产者才可以生产商品;只有当容器中物品数量大于0时,消费者...
  • 生产者消费者C语言

    千次阅读 2019-04-30 21:27:05
    操作系统生产者消费者问题C语言 #include <stdio.h> #include <pthread.h> #include <windows.h> #define N 100 #define true 1 #define producerNum 15 #define consumerN...
  • 使用Condition实现生产者消费者

    千次阅读 2018-11-10 11:25:21
    使用Condtion实现生产者消费者可以精确控制唤醒生产者还是消费者线程 与synchronized的等待唤醒机制相比Condition具有更多的灵活性以及精确性,这是因为notify()在唤醒线程时是随机(同一个锁),而Condition则可通过...
  • Python 实现生产者消费者模式

    千次阅读 2018-08-19 19:31:12
    生产者消费者模型 生产者消费者模式,即多条消费者线程和多条生产者线程共用同一缓冲区,线程之间协调工作。简单来说,生产者将信息发送至缓冲区,消费者将缓冲区的数据取出并进行处理。 生产者消费者模式...
  • JAVA多线程之生产者消费者模型

    万次阅读 多人点赞 2019-02-21 18:00:49
    生产者消费者模型 所谓的生产者消费者模型,是通过一个容器来解决生产者和消费者的强耦合问题。通俗的讲,就是生产者在不断的生产,消费者也在不断的消费,可是消费者消费的产品是生产者生产的,这就必然存在一个...
  • LinkedBlockingDeque实现生产者消费者

    千次阅读 2017-02-15 17:38:03
    生产者消费者原理解释: 生产者消费者的模型是指的是在多个线程间同时操作同一快的内存;一部分的线程向内存中添加数据(生产者) 一部分线程将内存中的数据取出并再这一块的内存中取消该数据(消费者);当内存中...
  • 生产者消费者模型java实现

    千次阅读 2018-09-12 10:33:45
    做题的时候遇到了生产者消费者问题,这个问题可以说是线程学习的经典题目了,就忍不住研究了一波。它描述是有一块缓冲区(队列实现)作为仓库,生产者可以将产品放入仓库,消费者则可以从仓库中取走产品。在Java中这...
  • 生产者消费者算法的实现

    千次阅读 2019-03-30 21:10:49
    操作系统中典型的生产者消费者算法的实现。
  • Java实现Kafka生产者消费者功能

    万次阅读 2018-10-28 12:43:09
    Java实现Kafka生产者消费者功能 好久没有更新博客,最近学的东西很多,但一直忙的没有时间去写,先补充一篇kafka的,最基本的功能使用,不得不感叹大数据确实难,即使只说一个简单的功能,之前也需要铺垫很多完成的...
  • 1,生产者消费者问题 问题的提出 初步思考 进程资源共享关系和同步关系分析 问题的具体解决 第一搏 存在的问题 第二搏 多维度思考 1,单生产者、单消费者、多缓冲区 2,多生产者、多消费者、单缓冲 3,单...
  • 生产者消费者模式浅析

    千次阅读 2014-07-09 13:43:31
    由于最近工作中,涉及到生产者消费者设计模式,对此有一些体会,所以总结一下,与大家分享。 什么是生产者消费者模式 在工作中,大家可能会碰到这样一种情况:某个模块负责产生数据,这些数据由另一个模块来负责...
  • (python)生产者消费者模型

    千次阅读 2018-04-25 16:06:57
    生产者消费者模型当中有两大类重要的角色,一个是生产者(负责造数据的任务),另一个是消费者(接收造出来的数据进行进一步的操作)。为什么要使用生产者消费者模型? 在并发编程中,如果生产者处理速度很快,而...
  • Linux生产者消费者模型实现

    千次阅读 2018-11-17 17:08:32
    任何语言提及到多线程同步都离不开生产者/消费者模型。这也是针对许多现实问题建模用到的基础模型。这一篇就来看一下在Linux环境下,C语言实现的两种生产者消费者模型。 关键字:Linux C 生产者 消费者 条件变量...
  • 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找...
  • Linux实现生产者消费者模型

    千次阅读 2017-06-01 11:35:03
    生产者消费者模型 简单来说就是“321原则(并非某一规则,而是为了理解生产者消费者模型)” “3”代表的是三种关系 生产者与消费者的互斥与同步关系 生产者与生产者的互斥(或竞争)关系 消费者与消费者的...
  • 线程面试:生产者 消费者问题

    千次阅读 2017-07-07 15:04:00
    生产者消费者问题是研究多线程程序时绕不开的经典问题之一,它描述是有一块缓冲区作为仓库,生产者可以将产品放入仓库,消费者则可以从仓库中取走产品。解决生产者/消费者问题的方法可分为两类:(1)采用某种机制...
  • LinkedBlockingQueue 实现 生产者 消费者

    千次阅读 2018-03-21 16:52:38
    是作为生产者消费者的首选,LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的话,默认最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在队列满的时候会阻塞直到有队列成员被消...
  • 生产者消费者模式的实现(java实现)

    万次阅读 2018-09-03 18:30:36
    生产者消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一存储空间,生产者向空间里生产数据,而消费者取走数据。 阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 44,039
精华内容 17,615
关键字:

生产者消费者