精华内容
下载资源
问答
  • 【操作系统】生产者消费者问题

    万次阅读 多人点赞 2018-08-11 00:43:20
    生产者消费者模型 生产者消费者模型 一、 生产者消费者问题 二、 问题分析 三、 伪代码实现 四、代码实现(C++) 五、 互斥锁与条件变量的使用比较 一、 生产者消费者问题 生产者消费者问题...

    生产者消费者模型

    一、 生产者消费者问题

    生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
    .
    要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。通常采用进程间通信的方法解决该问题。如果解决方法不够完善,则容易出现死锁的情况。出现死锁时,两个线程都会陷入休眠,等待对方唤醒自己。该问题也能被推广到多个生产者和消费者的情形。

    这里写图片描述


    二、 问题分析

    该问题需要注意的几点:

    • 在缓冲区为空时,消费者不能再进行消费
    • 在缓冲区为满时,生产者不能再进行生产
    • 在一个线程进行生产或消费时,其余线程不能再进行生产或消费等操作,即保持线程间的同步
    • 注意条件变量与互斥锁的顺序

    这里写图片描述
    由于前两点原因,因此需要保持线程间的同步,即一个线程消费(或生产)完,其他线程才能进行竞争CPU,获得消费(或生产)的机会。对于这一点,可以使用条件变量进行线程间的同步:生产者线程在product之前,需要wait直至获取自己所需的信号量之后,才会进行product的操作;同样,对于消费者线程,在consume之前需要wait直到没有线程在访问共享区(缓冲区),再进行consume的操作,之后再解锁并唤醒其他可用阻塞线程。

    这里写图片描述
    在访问共享区资源时,为避免多个线程同时访问资源造成混乱,需要对共享资源加锁,从而保证某一时刻只有一个线程在访问共享资源。


    三、 伪代码实现

    假设缓冲区大小为10,生产者、消费者线程若干。生产者和消费者相互等效,只要缓冲池未满,生产者便可将消息送入缓冲池;只要缓冲池未空,消费者便可从缓冲池中取走一个消息。

    • items代表缓冲区已经使用的资源数,spaces代表缓冲区可用资源数
    • mutex代表互斥锁
    • buf[10] 代表缓冲区,其内容类型为item
    • in、out代表第一个资源和最后一个资源
    var items = 0, space = 10, mutex = 1;
    var in = 0, out = 0;
    item buf[10] = { NULL };
    
    producer {
        while( true ) {
            wait( space );  // 等待缓冲区有空闲位置, 在使用PV操作时,条件变量需要在互斥锁之前
            wait( mutex );  // 保证在product时不会有其他线程访问缓冲区
    
            // product
            buf.push( item, in );  // 将新资源放到buf[in]位置 
            in = ( in + 1 ) % 10;
            
            signal( mutex );  // 唤醒的顺序可以不同
            signal( items );  // 通知consumer缓冲区有资源可以取走
        }
    }
    
    consumer {
        while( true ) {
            wait( items );  // 等待缓冲区有资源可以使用
            wait( mutex );  // 保证在consume时不会有其他线程访问缓冲区
    
            // consume
            buf.pop( out );  // 将buf[out]位置的的资源取走
            out = ( out + 1 ) % 10;
    
            signal( mutex );  // 唤醒的顺序可以不同
            signal( space );  // 通知缓冲区有空闲位置
        }
    }
    

    不能将线程里两个wait的顺序调换否则会出现死锁。例如(调换后),将consumer的两个wait调换,在producer发出signal信号后,如果producer线程此时再次获得运行机会,执行完了wait(space),此时,另一个consumer线程获得运行机会,执行了 wait(mutex) ,如果此时缓冲区为空,那么consumer将会阻塞在wait(items),而producer也会因为无法获得锁的所有权所以阻塞在wait(mutex),这样两个线程都在阻塞,也就造成了死锁。


    四、代码实现(C++)

    #include <iostream>
    #include <string.h>
    #include <pthread.h>
    #include <unistd.h>
    using namespace std;
    
    int current = 0;  // producer运行加1,consumer运行减1
    int buf[10];
    int in = 0, out = 0;
    int items = 0, spaces = 10;
    bool flag;  // 标记线程结束运行
    pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
    pthread_cond_t notfull = PTHREAD_COND_INITIALIZER;  // 缓冲区不满
    pthread_cond_t notempty = PTHREAD_COND_INITIALIZER;  // 缓冲区不空
    
    void *producer( void *arg ) {
        while( flag ) {
            pthread_mutex_lock( &mutex );  // 为保证条件变量不会因为多线程混乱,所以先加锁
            while( !spaces ) {  // 避免“惊群”效应,避免因其他线程实现得到事件而导致该线程“假醒”
                pthread_cond_wait( &notfull, &mutex );
            }
            buf[in] = current++;
            in = ( in + 1 ) % 10;
            items++;
            spaces--;
    
            printf( "producer %zu , current = %d\n", pthread_self(), current );
            for( int i = 0; i < 10; i++ ) {
                printf( "%-4d", buf[i] );
            }
            printf( "\n\n" );
    
            pthread_cond_signal( &notempty );
            pthread_mutex_unlock( &mutex );
        }
        pthread_exit( NULL );
    }
    
    void *consumer( void *arg ) {
        while( flag ) {
            pthread_mutex_lock( &mutex );
            while( !items ) {
                pthread_cond_wait( &notempty, &mutex );
            }
            buf[out] = -1;
            out = ( out + 1 ) % 10;
            current--;
            items--;
            spaces++;
    
            printf( "consumer %zu , current = %d\n", pthread_self(), current );
            for( int i = 0; i < 10; i++ ) {
                printf( "%-4d", buf[i] );
            }
            printf( "\n\n" );
    
            pthread_cond_signal( &notfull );
            pthread_mutex_unlock( &mutex );
        }
        pthread_exit( NULL );
    }
    
    int main() {
        memset( buf, -1, sizeof(buf) );
        flag = true;
        pthread_t pro[10], con[10];
        int i = 0;
    
        for( int i = 0; i < 10; i++ ) {
            pthread_create( &pro[i], NULL, producer, NULL );
            pthread_create( &con[i], NULL, consumer, NULL );
        }
    
        sleep(1);  // 让线程运行一秒
        flag = false;
    
        for( int i = 0; i < 10; i++ ) {
            pthread_join( pro[i], NULL );
            pthread_join( con[i], NULL );
        }
    
        return 0;
    } 
    

    五、 互斥锁与条件变量的使用比较

    我们会发现,在伪代码中强调了条件变量在前,互斥锁在后,而到了代码实现时又变成了先加互斥锁,再进行循环pthread_cond_wait()。这不是自相矛盾吗?

    其实,在伪代码中的wait()signal()就是操作系统中的PV操作,而PV操作定义就保证了该语句是原子操作,因此在wait条件变量改变的时候不会因为多进程同时访问共享资源造成混乱,所以为了保证线程间的同步,需要先加条件变量,等事件可使用后才进行线程相应的操作,此时互斥锁的作用是保证共享资源不会被其他线程访问。

    而在代码实现中,signal()对应的时pthread_cond_wait()函数,该函数在执行时会有三步:

    • 解开当前的锁
    • 等待条件变量达到所需要的状态
    • 再把之前解开的锁加锁

    为了实现将pthread_cond_wait()变成原子操作,就需要在该函数之前添加互斥锁。因为pthread_cond_wait()可以解锁,也就不会发生像伪代码所说的死锁问题。相反,如果像伪代码那样先使用条件变量,后加锁,则会造成多个线程同时访问共享资源的问题,造成数据的混乱。


    欢迎关注微信公众号,不定时分享学习资料与学习笔记,感谢!
    在这里插入图片描述

    展开全文
  • 生产者与消费者问题C语言实现

    万次阅读 多人点赞 2018-05-27 10:51:24
    生产者-消费者问题是典型的PV操作问题,假设系统中有一个比较大的缓冲池,生产者的任务是只要缓冲池未满就可以将生产出的产品放入其中,而消费者的任务是只要缓冲池未空就可以从缓冲池中拿走产品。缓冲池被占用时,...

    实验目的
    ①实现生产者—消费者问题的模拟,以便更好的理解此经典进程同步问题。生产者-消费者问题是典型的PV操作问题,假设系统中有一个比较大的缓冲池,生产者的任务是只要缓冲池未满就可以将生产出的产品放入其中,而消费者的任务是只要缓冲池未空就可以从缓冲池中拿走产品。缓冲池被占用时,任何进程都不能访问。

    ②每一个生产者都要把自己生产的产品放入缓冲池,每个消费者从缓冲池中取走产品消费。在这种情况下,生产者消费者进程同步,因为只有通过互通消息才知道是否能存入产品或者取走产品。他们之间也存在互斥,即生产者消费者必须互斥访问缓冲池,即不能有两个以上的进程同时进行。

    实验原理
    在同一个进程地址空间内执行两个线程。生产者线程生产物品,然后将物品放置在一个空缓冲区中供消费者线程消费。消费者线程从缓冲区中获得物品,然后释放缓冲区。当生产者线程生产物品时,如果没有空缓冲区可用,那么生产者线程必须等待消费者线程释放一个空缓冲区。当消费者线程消费物品时,如果没有满的缓冲区,那么消费者线程将被阻挡,直到新的物品被生产出来。

    生产者流程图
    这里写图片描述
    消费者流程图
    这里写图片描述
    注意点
    ①本次实验是关于生产者与消费者之间互斥和同步的问题。问题的是指是P、V操作,实验设一个共享缓冲区,生产者和消费者互斥的使用,当一个线程使用缓冲区的时候,另一个让其等待直到前一个线程释放缓冲区为止。
    ②生产者与消费者是一个与现实有关的经验问题,通过此原理举一反三可以解决其他类似的问题。 通过本实验设计,我们对操作系统的P、V进一步的认识,深入的了解P、V操作的实质和其重要性。课本的理论知识进一步阐述了现实中的实际问题。
    ③Linux环境下编写变异C语言有Windows稍有不同,注意在Linux中编译带有线程

    #include <stdio.h>
    #include <pthread.h>
    #include <windows.h>
    #define N 100
    #define true 1
    #define producerNum  10
    #define consumerNum  5
    #define sleepTime 1000
    
    typedef int semaphore;
    typedef int item;
    item buffer[N] = {0};
    int in = 0;
    int out = 0;
    int proCount = 0;
    semaphore mutex = 1, empty = N, full = 0, proCmutex = 1;
    
    void * producer(void * a){
        while(true){
            while(proCmutex <= 0);
            proCmutex--;
            proCount++;
            printf("生产一个产品ID%d, 缓冲区位置为%d\n",proCount,in);
            proCmutex++;
    
            while(empty <= 0){
                printf("缓冲区已满!\n");
            }
            empty--;
    
            while(mutex <= 0);
            mutex--;
    
            buffer[in] = proCount;
            in = (in + 1) % N;
    
            mutex++;
            full++;
            Sleep(sleepTime);
        }
    }
    
    void * consumer(void *b){
        while(true){
            while(full <= 0){
                printf("缓冲区为空!\n");
            }
            full--;
    
            while(mutex <= 0);
            mutex--;
    
            int nextc = buffer[out];
            buffer[out] = 0;//消费完将缓冲区设置为0
    
            out = (out + 1) % N;
    
            mutex++;
            empty++;
    
            printf("\t\t\t\t消费一个产品ID%d,缓冲区位置为%d\n", nextc,out);
            Sleep(sleepTime);
        }
    }
    
    int main()
    {
        pthread_t threadPool[producerNum+consumerNum];
        int i;
        for(i = 0; i < producerNum; i++){
            pthread_t temp;
            if(pthread_create(&temp, NULL, producer, NULL) == -1){
                printf("ERROR, fail to create producer%d\n", i);
                exit(1);
            }
            threadPool[i] = temp;
        }//创建生产者进程放入线程池
    
    
        for(i = 0; i < consumerNum; i++){
            pthread_t temp;
            if(pthread_create(&temp, NULL, consumer, NULL) == -1){
                printf("ERROR, fail to create consumer%d\n", i);
                exit(1);
            }
            threadPool[i+producerNum] = temp;
        }//创建消费者进程放入线程池
    
    
        void * result;
        for(i = 0; i < producerNum+consumerNum; i++){
            if(pthread_join(threadPool[i], &result) == -1){
                printf("fail to recollect\n");
                exit(1);
            }
        }//运行线程池
        return 0;
    }

    这里写图片描述
    更多内容访问omegaxyz.com
    网站所有代码采用Apache 2.0授权
    网站文章采用知识共享许可协议BY-NC-SA4.0授权
    © 2018 • OmegaXYZ-版权所有 转载请注明出处

    展开全文
  • Java实现Kafka生产者和消费者的示例

    万次阅读 多人点赞 2021-01-05 10:06:08
    Java实现Kafka生产者和消费者的示例

    文章持续更新,微信搜索「万猫学社」第一时间阅读。
    关注后回复「电子书」,免费获取12本Java必读技术书籍。

    Kafka简介

    Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。

    方式一:kafka-clients

    引入依赖

    在pom.xml文件中,引入kafka-clients依赖:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.3.1</version>
    </dependency>
    

    生产者

    创建一个KafkaProducer的生产者实例:

    @Configuration
    public class Config {
    
        public final static String bootstrapServers = "127.0.0.1:9092";
    
        @Bean(destroyMethod = "close")
        public KafkaProducer<String, String> kafkaProducer() {
            Properties props = new Properties();
            //设置Kafka服务器地址
            props.put("bootstrap.servers", bootstrapServers);
            //设置数据key的序列化处理类
            props.put("key.serializer", StringSerializer.class.getName());
            //设置数据value的序列化处理类
            props.put("value.serializer", StringSerializer.class.getName());
            KafkaProducer<String, String> producer = new KafkaProducer<>(props);
            return producer;
        }
    }
    

    在Controller中进行使用:

    @RestController
    @Slf4j
    public class Controller {
    
        @Autowired
        private KafkaProducer<String, String> kafkaProducer;
    
        @RequestMapping("/kafkaClientsSend")
        public String send() {
            String uuid = UUID.randomUUID().toString();
            RecordMetadata recordMetadata = null;
            try {
            	//将消息发送到Kafka服务器的名称为“one-more-topic”的Topic中
                recordMetadata = kafkaProducer.send(new ProducerRecord<>("one-more-topic", uuid)).get();
                log.info("recordMetadata: {}", recordMetadata);
                log.info("uuid: {}", uuid);
            } catch (Exception e) {
                log.error("send fail, uuid: {}", uuid, e);
            }
            return uuid;
        }
    }
    

    消费者

    创建一个KafkaConsumer的消费者实例:

    @Configuration
    public class Config {
    
        public final static String groupId = "kafka-clients-group";
        public final static String bootstrapServers = "127.0.0.1:9092";
    
        @Bean(destroyMethod = "close")
        public KafkaConsumer<String, String> kafkaConsumer() {
            Properties props = new Properties();
            //设置Kafka服务器地址
            props.put("bootstrap.servers", bootstrapServers);
            //设置消费组
            props.put("group.id", groupId);
            //设置数据key的反序列化处理类
            props.put("key.deserializer", StringDeserializer.class.getName());
            //设置数据value的反序列化处理类
            props.put("value.deserializer", StringDeserializer.class.getName());
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
            //订阅名称为“one-more-topic”的Topic的消息
            kafkaConsumer.subscribe(Arrays.asList("one-more-topic"));
            return kafkaConsumer;
        }
    }
    

    在Controller中进行使用:

    @RestController
    @Slf4j
    public class Controller {
    
        @Autowired
        private KafkaConsumer<String, String> kafkaConsumer;
    
        @RequestMapping("/receive")
        public List<String> receive() {
        	从Kafka服务器中的名称为“one-more-topic”的Topic中消费消息
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
            List<String> messages = new ArrayList<>(records.count());
            for (ConsumerRecord<String, String> record : records.records("one-more-topic")) {
                String message = record.value();
                log.info("message: {}", message);
                messages.add(message);
            }
            return messages;
        }
    }
    

    方式二:spring-kafka

    使用kafka-clients需要我们自己创建生产者或者消费者的bean,如果我们的项目基于SpringBoot构建,那么使用spring-kafka就方便多了。

    引入依赖

    在pom.xml文件中,引入spring-kafka依赖:

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.3.12.RELEASE</version>
    </dependency>
    

    生产者

    在application.yml文件中增加配置:

    spring:
      kafka:
      	#Kafka服务器地址
        bootstrap-servers: 127.0.0.1:9092
        producer:
          #设置数据value的序列化处理类
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
    

    在Controller中注入KafkaTemplate就可以直接使用了,代码如下:

    @RestController
    @Slf4j
    public class Controller {
    
        @Autowired
        private KafkaTemplate<String, String> template;
    
        @RequestMapping("/springKafkaSend")
        public String send() {
            String uuid = UUID.randomUUID().toString();
            //将消息发送到Kafka服务器的名称为“one-more-topic”的Topic中
            this.template.send("one-more-topic", uuid);
            log.info("uuid: {}", uuid);
            return uuid;
        }
    }
    

    消费者

    在application.yml文件中增加配置:

    spring:
      kafka:
        #Kafka服务器地址
        bootstrap-servers: 127.0.0.1:9092
        consumer:
          #设置数据value的反序列化处理类
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    

    创建一个可以被Spring框架扫描到的类,并且在方法上加上@KafkaListener注解,就可以消费消息了,代码如下:

    @Component
    @Slf4j
    public class Receiver {
    
        @KafkaListener(topics = "one-more-topic", groupId = "spring-kafka-group")
        public void listen(ConsumerRecord<?, ?> record) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            if (kafkaMessage.isPresent()) {
                String message = (String) kafkaMessage.get();
                log.info("message: {}", message);
            }
        }
    }
    

    文章持续更新,微信搜索「万猫学社」第一时间阅读。
    关注后回复「电子书」,免费获取12本Java必读技术书籍。

    展开全文
  • 秒杀多线程第十篇 生产者消费者问题

    万次阅读 多人点赞 2012-05-21 10:18:09
    生产者消费者问题是一个著名的线程同步问题,该问题描述如下:有一个生产者在生产产品,这些产品将提供给若干个消费者去消费,为了使生产者和消费者能并发执行,在两者之间设置一个具有多个缓冲区的缓冲池,生产者将...

        继经典线程同步问题之后,我们来看看生产者消费者问题及读者写者问题。生产者消费者问题是一个著名的线程同步问题,该问题描述如下:有一个生产者在生产产品,这些产品将提供给若干个消费者去消费,为了使生产者和消费者能并发执行,在两者之间设置一个具有多个缓冲区的缓冲池,生产者将它生产的产品放入一个缓冲区中,消费者可以从缓冲区中取走产品进行消费,显然生产者和消费者之间必须保持同步,即不允许消费者到一个空的缓冲区中取产品,也不允许生产者向一个已经放入产品的缓冲区中再次投放产品。

        这个生产者消费者题目不仅常用于操作系统的课程设计,也常常在程序员和软件设计师考试中出现。并且在计算机考研的专业课考试中也是一个非常热门的问题。因此现在就针对这个问题进行详细深入的解答。

     

        首先来简化问题,先假设生产者和消费者都只有一个,且缓冲区也只有一个。这样情况就简便多了。

        第一.从缓冲区取出产品和向缓冲区投放产品必须是互斥进行的。可以用关键段互斥量来完成。

        第二.生产者要等待缓冲区为空,这样才可以投放产品,消费者要等待缓冲区不为空,这样才可以取出产品进行消费。并且由于有二个等待过程,所以要用二个事件信号量来控制。

        考虑这二点后,代码很容易写出来。另外为了美观起见,将消费者的输出颜色设置为彩色,有关如何在控制台下设置彩色输出请参阅《VC 控制台颜色设置》。

    //1生产者 1消费者 1缓冲区
    //使用二个事件,一个表示缓冲区空,一个表示缓冲区满。
    //再使用一个关键段来控制缓冲区的访问
    #include <stdio.h>
    #include <process.h>
    #include <windows.h>
    //设置控制台输出颜色
    BOOL SetConsoleColor(WORD wAttributes)
    {
    	HANDLE hConsole = GetStdHandle(STD_OUTPUT_HANDLE);
    	if (hConsole == INVALID_HANDLE_VALUE)
    		return FALSE;	
    	return SetConsoleTextAttribute(hConsole, wAttributes);
    }
    const int END_PRODUCE_NUMBER = 10;   //生产产品个数
    int g_Buffer;                        //缓冲区
    //事件与关键段
    CRITICAL_SECTION g_cs;
    HANDLE g_hEventBufferEmpty, g_hEventBufferFull;
    //生产者线程函数
    unsigned int __stdcall ProducerThreadFun(PVOID pM)
    {
    	for (int i = 1; i <= END_PRODUCE_NUMBER; i++)
    	{
    		//等待缓冲区为空
    		WaitForSingleObject(g_hEventBufferEmpty, INFINITE);
    
    		//互斥的访问缓冲区
    		EnterCriticalSection(&g_cs);
    		g_Buffer = i;
    		printf("生产者将数据%d放入缓冲区\n", i);
    		LeaveCriticalSection(&g_cs);
    		
    		//通知缓冲区有新数据了
    		SetEvent(g_hEventBufferFull);
    	}
    	return 0;
    }
    //消费者线程函数
    unsigned int __stdcall ConsumerThreadFun(PVOID pM)
    {
    	volatile bool flag = true;
    	while (flag)
    	{
    		//等待缓冲区中有数据
    		WaitForSingleObject(g_hEventBufferFull, INFINITE);
    		
    		//互斥的访问缓冲区
    		EnterCriticalSection(&g_cs);
    		SetConsoleColor(FOREGROUND_GREEN);
    		printf("  消费者从缓冲区中取数据%d\n", g_Buffer);
    		SetConsoleColor(FOREGROUND_RED | FOREGROUND_GREEN | FOREGROUND_BLUE);
    		if (g_Buffer == END_PRODUCE_NUMBER)
    			flag = false;
    		LeaveCriticalSection(&g_cs);
    		
    		//通知缓冲区已为空
    		SetEvent(g_hEventBufferEmpty);
    
    		Sleep(10); //some other work should to do
    	}
    	return 0;
    }
    int main()
    {
    	printf("  生产者消费者问题   1生产者 1消费者 1缓冲区\n");
    	printf(" -- by MoreWindows( http://blog.csdn.net/MoreWindows ) --\n\n");
    
    	InitializeCriticalSection(&g_cs);
    	//创建二个自动复位事件,一个表示缓冲区是否为空,另一个表示缓冲区是否已经处理
    	g_hEventBufferEmpty = CreateEvent(NULL, FALSE, TRUE, NULL);
    	g_hEventBufferFull = CreateEvent(NULL, FALSE, FALSE, NULL);
    	
    	const int THREADNUM = 2;
    	HANDLE hThread[THREADNUM];
    	
    	hThread[0] = (HANDLE)_beginthreadex(NULL, 0, ProducerThreadFun, NULL, 0, NULL);
    	hThread[1] = (HANDLE)_beginthreadex(NULL, 0, ConsumerThreadFun, NULL, 0, NULL);
    	WaitForMultipleObjects(THREADNUM, hThread, TRUE, INFINITE);
    	CloseHandle(hThread[0]);
    	CloseHandle(hThread[1]);
    	
    	//销毁事件和关键段
    	CloseHandle(g_hEventBufferEmpty);
    	CloseHandle(g_hEventBufferFull);
    	DeleteCriticalSection(&g_cs);
    	return 0;
    }

    运行结果如下所示:

    可以看出生产者与消费者已经是有序的工作了。

     

        然后再对这个简单生产者消费者问题加大难度。将消费者改成2个,缓冲池改成拥有4个缓冲区的大缓冲池。

        如何来思考了这个问题了?首先根据上面分析的二点,可以知道生产者和消费者由一个变成多个的影响不大,唯一要注意的是缓冲池变大了,回顾一下《秒杀多线程第八篇 经典线程同步 信号量Semaphore》中的信号量,不难得出用二个信号量就可以解决这种缓冲池有多个缓冲区的情况——用一个信号量A来记录为空的缓冲区个数,另一个信号量B记录非空的缓冲区个数,然后生产者等待信号量A,消费者等待信号量B就可以了。因此可以仿照上面的代码来实现复杂生产者消费者问题,示例代码如下:

    //1生产者 2消费者 4缓冲区
    #include <stdio.h>
    #include <process.h>
    #include <windows.h>
    //设置控制台输出颜色
    BOOL SetConsoleColor(WORD wAttributes)
    {
    	HANDLE hConsole = GetStdHandle(STD_OUTPUT_HANDLE);
    	if (hConsole == INVALID_HANDLE_VALUE)
    		return FALSE;
    	
    	return SetConsoleTextAttribute(hConsole, wAttributes);
    }
    const int END_PRODUCE_NUMBER = 8;  //生产产品个数
    const int BUFFER_SIZE = 4;          //缓冲区个数
    int g_Buffer[BUFFER_SIZE];          //缓冲池
    int g_i, g_j;
    //信号量与关键段
    CRITICAL_SECTION g_cs;
    HANDLE g_hSemaphoreBufferEmpty, g_hSemaphoreBufferFull;
    //生产者线程函数
    unsigned int __stdcall ProducerThreadFun(PVOID pM)
    {
    	for (int i = 1; i <= END_PRODUCE_NUMBER; i++)
    	{
    		//等待有空的缓冲区出现
    		WaitForSingleObject(g_hSemaphoreBufferEmpty, INFINITE);
    
    		//互斥的访问缓冲区
    		EnterCriticalSection(&g_cs);
    		g_Buffer[g_i] = i;
    		printf("生产者在缓冲池第%d个缓冲区中投放数据%d\n", g_i, g_Buffer[g_i]);
    		g_i = (g_i + 1) % BUFFER_SIZE;
    		LeaveCriticalSection(&g_cs);
    
    		//通知消费者有新数据了
    		ReleaseSemaphore(g_hSemaphoreBufferFull, 1, NULL);
    	}
    	printf("生产者完成任务,线程结束运行\n");
    	return 0;
    }
    //消费者线程函数
    unsigned int __stdcall ConsumerThreadFun(PVOID pM)
    {
    	while (true)
    	{
    		//等待非空的缓冲区出现
    		WaitForSingleObject(g_hSemaphoreBufferFull, INFINITE);
    		
    		//互斥的访问缓冲区
    		EnterCriticalSection(&g_cs);
    		SetConsoleColor(FOREGROUND_GREEN);
    		printf("  编号为%d的消费者从缓冲池中第%d个缓冲区取出数据%d\n", GetCurrentThreadId(), g_j, g_Buffer[g_j]);
    		SetConsoleColor(FOREGROUND_RED | FOREGROUND_GREEN | FOREGROUND_BLUE);
    		if (g_Buffer[g_j] == END_PRODUCE_NUMBER)//结束标志
    		{
    			LeaveCriticalSection(&g_cs);
    			//通知其它消费者有新数据了(结束标志)
    			ReleaseSemaphore(g_hSemaphoreBufferFull, 1, NULL);
    			break;
    		}
    		g_j = (g_j + 1) % BUFFER_SIZE;
    		LeaveCriticalSection(&g_cs);
    
    		Sleep(50); //some other work to do
    
    		ReleaseSemaphore(g_hSemaphoreBufferEmpty, 1, NULL);
    	}
    	SetConsoleColor(FOREGROUND_GREEN);
    	printf("  编号为%d的消费者收到通知,线程结束运行\n", GetCurrentThreadId());
    	SetConsoleColor(FOREGROUND_RED | FOREGROUND_GREEN | FOREGROUND_BLUE);
    	return 0;
    }
    int main()
    {
    	printf("  生产者消费者问题   1生产者 2消费者 4缓冲区\n");
    	printf(" -- by MoreWindows( http://blog.csdn.net/MoreWindows ) --\n\n");
    
    	InitializeCriticalSection(&g_cs);
    	//初始化信号量,一个记录有产品的缓冲区个数,另一个记录空缓冲区个数.
    	g_hSemaphoreBufferEmpty = CreateSemaphore(NULL, 4, 4, NULL);
    	g_hSemaphoreBufferFull  = CreateSemaphore(NULL, 0, 4, NULL);
    	g_i = 0;
    	g_j = 0;
    	memset(g_Buffer, 0, sizeof(g_Buffer));
    
    	const int THREADNUM = 3;
    	HANDLE hThread[THREADNUM];
    	//生产者线程
    	hThread[0] = (HANDLE)_beginthreadex(NULL, 0, ProducerThreadFun, NULL, 0, NULL);
    	//消费者线程
    	hThread[1] = (HANDLE)_beginthreadex(NULL, 0, ConsumerThreadFun, NULL, 0, NULL);
    	hThread[2] = (HANDLE)_beginthreadex(NULL, 0, ConsumerThreadFun, NULL, 0, NULL);
    	WaitForMultipleObjects(THREADNUM, hThread, TRUE, INFINITE);
    	for (int i = 0; i < THREADNUM; i++)
    		CloseHandle(hThread[i]);
    
    	//销毁信号量和关键段
    	CloseHandle(g_hSemaphoreBufferEmpty);
    	CloseHandle(g_hSemaphoreBufferFull);
    	DeleteCriticalSection(&g_cs);
    	return 0;
    }

    运行结果如下图所示:

    输出结果证明各线程的同步和互斥已经完成了。

     

    至此,生产者消费者问题已经圆满的解决了,下面作个总结:

    1.首先要考虑生产者与消费者对缓冲区操作时的互斥。

    2.不管生产者与消费者有多少个,缓冲池有多少个缓冲区。都只有二个同步过程——分别是生产者要等待有空缓冲区才能投放产品,消费者要等待有非空缓冲区才能去取产品。

     

    下一篇《秒杀多线程第十一篇读者写者问题》将介绍另一个著名的同步问题——读者写者问题,欢迎大家再来参阅。

     

    转载请标明出处,原文地址:http://blog.csdn.net/morewindows/article/details/7577591

    如果觉得本文对您有帮助,请点击‘顶’支持一下,您的支持是我写作最大的动力,谢谢。

     

    展开全文
  • Java多种方式解决生产者消费者问题(十分详细)

    万次阅读 多人点赞 2018-08-16 08:40:50
    生产者消费者问题 一、问题描述 生产者消费者问题(Producer-consumer problem),也称有限缓冲问题(Bounded-buffer problem),是一个多线程同步问题的经典案例。生产者生成一定量的数据放到缓冲区中,...
  • Java多线程技术~生产者和消费者问题

    万次阅读 热门讨论 2020-06-16 14:15:18
    Java多线程技术~生产者和消费者问题 本文是上一篇文章的后续,详情点击该连接 线程通信 应用场景:生产者和消费者问题        假设仓库中只能存放一件产品,生产者将生产出来的...
  • Java实现生产者和消费者模式

    万次阅读 多人点赞 2019-10-26 18:39:07
    生产者、消费者模型会出现的问题出发,谈了一下对生产者、消费者模型的理解,并配有完整的代码实现。
  • 信号量与生产者消费者问题

    万次阅读 多人点赞 2017-01-19 15:06:44
    生产者—消费者问题 生产者—消费者题型在各类考试(考研、程序员证书、程序员面试笔试、期末考试)很常见,原因之一是生产者—消费者题型在实际的并发程序(多进程、多线程)设计中很常见;之二是这种题型综合性较...
  • 生产者消费者问题 C++实现

    万次阅读 2018-06-29 17:00:03
    生产者消费者问题 C++实现 知识准备 thread 介绍 成员类 成员函数 sleep_for 介绍 mutex 介绍 成员函数 unique_lock 介绍 成员函数 codition_variable 介绍 成员函数 代码示例 生产者消费者问题 ...
  • 生产者消费者模式

    千次阅读 2018-09-06 23:21:22
    生产者消费者模式
  • 生产者与消费者模型

    千次阅读 2020-09-26 18:40:01
    生产者-消费者模式是一个经典的多线程设计模式。 在生产者-消费者模式中,通常有两类线程,即若干个生产者和消费者线程。 生产者线程负责提交用户请求 消费者线程负责处理生产者提交的任务。 内存缓冲区 缓存生产者...
  • 实现生产者消费者的三种方式

    千次阅读 多人点赞 2019-10-14 21:22:03
    文章目录wait/notify的消息通知机制预备知识wait/notify消息通知潜在的一些问题notify过早通知等待wait的条件发生变化假死状态wait/notifyAll实现生产者-消费者使用Lock中Condition的await/signalAll实现生产者-消费...
  • 生产者消费者模型

    千次阅读 多人点赞 2017-02-20 23:33:26
    一、什么是生产者消费者模型 在实际的开发中,经常会碰到如下场景:某个模块负责生产数据,这些数据由另一个模块来负责处理。产生数据的模块就形象的称为生产者,而处理数据的模块就称为消费者。只有生产者和消费...
  • java 生产者消费者模式

    万次阅读 2017-04-27 15:27:01
    java的生产者消费者模式,有三个部分组成,一个是生产者,一个是消费者,一个是缓存。 这么做有什么好处呢? 1.解耦(去依赖),如果是消费者直接调用生产者,那如果生产者的代码变动了,消费者的代码也需要随之变动 ...
  • 生产者/消费者模式的理解及实现

    万次阅读 多人点赞 2018-05-31 10:05:37
    ★简介 生产者消费者模式并不是GOF提出的23种设计模式之一,23种设计模式都是建立在面向对象的基础之上的,但其实面向过程的编程中也有很多高效的编程模式,生产者消费者模式便是其中之一,它是我们编程过程中最...
  • 生产者消费者问题

    千次阅读 2019-09-03 23:19:34
    临界区:对临界资源进行访问的那段代码称为临界区。为了互斥访问临界资源,每个进程在进入临界区...使用一个容器来存放物品,只有当容器中还有空位置时,生产者才可以生产商品;只有当容器中物品数量大于0时,消费者...
  • 生产者-消费者问题

    千次阅读 2019-07-17 19:39:45
    生产者消费者问题是研究多线程程序时绕不开的经典问题之一,它描述是有一块缓冲区作为仓库,生产者可以将产品放入仓库,消费者则可以从仓库中取走产品。解决生产者 / 消费者问题的方法可分为两类:(1)采用某种机制...
  • 文章目录生产者、消费者问题描述PV操作题目分析步骤多生产者、多消费者问题描述问题分析吸烟者问题描述问题分析 生产者、消费者问题描述 系统中有一组生产者进程和一组消费者进程,生产者进程每次生成一个产品放入...
  • java实现生产者和消费者

    千次阅读 2019-06-19 11:29:10
    本篇博文主要介绍如何使用java来实现简单的生产者和消费者 来帮助大家简单的入门理解生产者和消费者模式 一、对生产者消费者的理解 生产者消费者模式是并发、多线程编程中经典的设计模式。 简单来看,就是一个类...
  • Java——生产者-消费者问题(GUI)

    万次阅读 2020-05-01 19:02:43
    设计一个模拟仿真“生产者-消费者”问题的解决过程及方法的程序
  • 使用Condition实现生产者消费者

    千次阅读 2018-11-10 11:25:21
    使用Condtion实现生产者消费者可以精确控制唤醒生产者还是消费者线程 与synchronized的等待唤醒机制相比Condition具有更多的灵活性以及精确性,这是因为notify()在唤醒线程时是随机(同一个锁),而Condition则可通过...
  • 生产者消费者C语言

    千次阅读 2019-04-30 21:27:05
    操作系统生产者消费者问题C语言 #include <stdio.h> #include <pthread.h> #include <windows.h> #define N 100 #define true 1 #define producerNum 15 #define consumerN...
  • Kafka消费者生产者实例

    万次阅读 2017-07-30 18:22:56
    为了更为直观展示Kafka的消息生产消费的过程,我会从基于Console和基于Application两个方面介绍使用实例。...由于主要介绍如何使用Kafka快速构建生产者消费者实例,所以不会涉及Kafka内部的原理。一个基于Kafk
  • 深入分析Kafka生产者和消费者

    千次阅读 2020-10-29 22:43:00
    深入Kafka生产者和消费者Kafka生产者消息发送的流程发送方式生产者属性配置序列化器分区器Kafka消费者消费者群组消费者属性配置消费者基础概念消费者核心概念 Kafka生产者 消息发送的流程 生产者每发送一条消息需要...
  • 生产者消费者 & 读者写者

    千次阅读 2016-07-17 20:50:51
    消费者生产者 读者写者
  • 生产者与消费者分析

    千次阅读 2018-08-04 10:06:53
    众所周知大型网站都是并发支持,用到最多的也是生产者与消费者模式。 生产消费者模型  生产者消费者模型具体来讲,就是在一个系统中,存在生产者和消费者两种角色,他们通过内存缓冲区进行通信,生产者生产消费者...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 70,540
精华内容 28,216
关键字:

生产者