-
【操作系统】生产者消费者问题
2018-08-11 00:43:20生产者消费者模型 生产者消费者模型 一、 生产者消费者问题 二、 问题分析 三、 伪代码实现 四、代码实现(C++) 五、 互斥锁与条件变量的使用比较 一、 生产者消费者问题 生产者消费者问题...生产者消费者模型 一、 生产者消费者问题
生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
.
要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。通常采用进程间通信的方法解决该问题。如果解决方法不够完善,则容易出现死锁的情况。出现死锁时,两个线程都会陷入休眠,等待对方唤醒自己。该问题也能被推广到多个生产者和消费者的情形。
二、 问题分析
该问题需要注意的几点:
- 在缓冲区为空时,消费者不能再进行消费
- 在缓冲区为满时,生产者不能再进行生产
- 在一个线程进行生产或消费时,其余线程不能再进行生产或消费等操作,即保持线程间的同步
- 注意条件变量与互斥锁的顺序
由于前两点原因,因此需要保持线程间的同步,即一个线程消费(或生产)完,其他线程才能进行竞争CPU,获得消费(或生产)的机会。对于这一点,可以使用条件变量进行线程间的同步:生产者线程在product之前,需要wait直至获取自己所需的信号量之后,才会进行product的操作;同样,对于消费者线程,在consume之前需要wait直到没有线程在访问共享区(缓冲区),再进行consume的操作,之后再解锁并唤醒其他可用阻塞线程。
在访问共享区资源时,为避免多个线程同时访问资源造成混乱,需要对共享资源加锁,从而保证某一时刻只有一个线程在访问共享资源。
三、 伪代码实现
假设缓冲区大小为10,生产者、消费者线程若干。生产者和消费者相互等效,只要缓冲池未满,生产者便可将消息送入缓冲池;只要缓冲池未空,消费者便可从缓冲池中取走一个消息。
- items代表缓冲区已经使用的资源数,spaces代表缓冲区可用资源数
- mutex代表互斥锁
- buf[10] 代表缓冲区,其内容类型为item
- in、out代表第一个资源和最后一个资源
var items = 0, space = 10, mutex = 1; var in = 0, out = 0; item buf[10] = { NULL }; producer { while( true ) { wait( space ); // 等待缓冲区有空闲位置, 在使用PV操作时,条件变量需要在互斥锁之前 wait( mutex ); // 保证在product时不会有其他线程访问缓冲区 // product buf.push( item, in ); // 将新资源放到buf[in]位置 in = ( in + 1 ) % 10; signal( mutex ); // 唤醒的顺序可以不同 signal( items ); // 通知consumer缓冲区有资源可以取走 } } consumer { while( true ) { wait( items ); // 等待缓冲区有资源可以使用 wait( mutex ); // 保证在consume时不会有其他线程访问缓冲区 // consume buf.pop( out ); // 将buf[out]位置的的资源取走 out = ( out + 1 ) % 10; signal( mutex ); // 唤醒的顺序可以不同 signal( space ); // 通知缓冲区有空闲位置 } }
不能将线程里两个wait的顺序调换否则会出现死锁。例如(调换后),将consumer的两个wait调换,在producer发出signal信号后,如果producer线程此时再次获得运行机会,执行完了
wait(space)
,此时,另一个consumer线程获得运行机会,执行了 wait(mutex) ,如果此时缓冲区为空,那么consumer将会阻塞在wait(items)
,而producer也会因为无法获得锁的所有权所以阻塞在wait(mutex)
,这样两个线程都在阻塞,也就造成了死锁。
四、代码实现(C++)
#include <iostream> #include <string.h> #include <pthread.h> #include <unistd.h> using namespace std; int current = 0; // producer运行加1,consumer运行减1 int buf[10]; int in = 0, out = 0; int items = 0, spaces = 10; bool flag; // 标记线程结束运行 pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t notfull = PTHREAD_COND_INITIALIZER; // 缓冲区不满 pthread_cond_t notempty = PTHREAD_COND_INITIALIZER; // 缓冲区不空 void *producer( void *arg ) { while( flag ) { pthread_mutex_lock( &mutex ); // 为保证条件变量不会因为多线程混乱,所以先加锁 while( !spaces ) { // 避免“惊群”效应,避免因其他线程实现得到事件而导致该线程“假醒” pthread_cond_wait( ¬full, &mutex ); } buf[in] = current++; in = ( in + 1 ) % 10; items++; spaces--; printf( "producer %zu , current = %d\n", pthread_self(), current ); for( int i = 0; i < 10; i++ ) { printf( "%-4d", buf[i] ); } printf( "\n\n" ); pthread_cond_signal( ¬empty ); pthread_mutex_unlock( &mutex ); } pthread_exit( NULL ); } void *consumer( void *arg ) { while( flag ) { pthread_mutex_lock( &mutex ); while( !items ) { pthread_cond_wait( ¬empty, &mutex ); } buf[out] = -1; out = ( out + 1 ) % 10; current--; items--; spaces++; printf( "consumer %zu , current = %d\n", pthread_self(), current ); for( int i = 0; i < 10; i++ ) { printf( "%-4d", buf[i] ); } printf( "\n\n" ); pthread_cond_signal( ¬full ); pthread_mutex_unlock( &mutex ); } pthread_exit( NULL ); } int main() { memset( buf, -1, sizeof(buf) ); flag = true; pthread_t pro[10], con[10]; int i = 0; for( int i = 0; i < 10; i++ ) { pthread_create( &pro[i], NULL, producer, NULL ); pthread_create( &con[i], NULL, consumer, NULL ); } sleep(1); // 让线程运行一秒 flag = false; for( int i = 0; i < 10; i++ ) { pthread_join( pro[i], NULL ); pthread_join( con[i], NULL ); } return 0; }
五、 互斥锁与条件变量的使用比较
我们会发现,在伪代码中强调了条件变量在前,互斥锁在后,而到了代码实现时又变成了先加互斥锁,再进行循环
pthread_cond_wait()
。这不是自相矛盾吗?其实,在伪代码中的
wait()
、signal()
就是操作系统中的PV操作,而PV操作定义就保证了该语句是原子操作,因此在wait条件变量改变的时候不会因为多进程同时访问共享资源造成混乱,所以为了保证线程间的同步,需要先加条件变量,等事件可使用后才进行线程相应的操作,此时互斥锁的作用是保证共享资源不会被其他线程访问。而在代码实现中,
signal()
对应的时pthread_cond_wait()
函数,该函数在执行时会有三步:- 解开当前的锁
- 等待条件变量达到所需要的状态
- 再把之前解开的锁加锁
为了实现将
pthread_cond_wait()
变成原子操作,就需要在该函数之前添加互斥锁。因为pthread_cond_wait()
可以解锁,也就不会发生像伪代码所说的死锁问题。相反,如果像伪代码那样先使用条件变量,后加锁,则会造成多个线程同时访问共享资源的问题,造成数据的混乱。
欢迎关注微信公众号,不定时分享学习资料与学习笔记,感谢!
-
生产者与消费者问题C语言实现
2018-05-27 10:51:24生产者-消费者问题是典型的PV操作问题,假设系统中有一个比较大的缓冲池,生产者的任务是只要缓冲池未满就可以将生产出的产品放入其中,而消费者的任务是只要缓冲池未空就可以从缓冲池中拿走产品。缓冲池被占用时,...实验目的
①实现生产者—消费者问题的模拟,以便更好的理解此经典进程同步问题。生产者-消费者问题是典型的PV操作问题,假设系统中有一个比较大的缓冲池,生产者的任务是只要缓冲池未满就可以将生产出的产品放入其中,而消费者的任务是只要缓冲池未空就可以从缓冲池中拿走产品。缓冲池被占用时,任何进程都不能访问。②每一个生产者都要把自己生产的产品放入缓冲池,每个消费者从缓冲池中取走产品消费。在这种情况下,生产者消费者进程同步,因为只有通过互通消息才知道是否能存入产品或者取走产品。他们之间也存在互斥,即生产者消费者必须互斥访问缓冲池,即不能有两个以上的进程同时进行。
实验原理
在同一个进程地址空间内执行两个线程。生产者线程生产物品,然后将物品放置在一个空缓冲区中供消费者线程消费。消费者线程从缓冲区中获得物品,然后释放缓冲区。当生产者线程生产物品时,如果没有空缓冲区可用,那么生产者线程必须等待消费者线程释放一个空缓冲区。当消费者线程消费物品时,如果没有满的缓冲区,那么消费者线程将被阻挡,直到新的物品被生产出来。生产者流程图
消费者流程图
注意点
①本次实验是关于生产者与消费者之间互斥和同步的问题。问题的是指是P、V操作,实验设一个共享缓冲区,生产者和消费者互斥的使用,当一个线程使用缓冲区的时候,另一个让其等待直到前一个线程释放缓冲区为止。
②生产者与消费者是一个与现实有关的经验问题,通过此原理举一反三可以解决其他类似的问题。 通过本实验设计,我们对操作系统的P、V进一步的认识,深入的了解P、V操作的实质和其重要性。课本的理论知识进一步阐述了现实中的实际问题。
③Linux环境下编写变异C语言有Windows稍有不同,注意在Linux中编译带有线程#include <stdio.h> #include <pthread.h> #include <windows.h> #define N 100 #define true 1 #define producerNum 10 #define consumerNum 5 #define sleepTime 1000 typedef int semaphore; typedef int item; item buffer[N] = {0}; int in = 0; int out = 0; int proCount = 0; semaphore mutex = 1, empty = N, full = 0, proCmutex = 1; void * producer(void * a){ while(true){ while(proCmutex <= 0); proCmutex--; proCount++; printf("生产一个产品ID%d, 缓冲区位置为%d\n",proCount,in); proCmutex++; while(empty <= 0){ printf("缓冲区已满!\n"); } empty--; while(mutex <= 0); mutex--; buffer[in] = proCount; in = (in + 1) % N; mutex++; full++; Sleep(sleepTime); } } void * consumer(void *b){ while(true){ while(full <= 0){ printf("缓冲区为空!\n"); } full--; while(mutex <= 0); mutex--; int nextc = buffer[out]; buffer[out] = 0;//消费完将缓冲区设置为0 out = (out + 1) % N; mutex++; empty++; printf("\t\t\t\t消费一个产品ID%d,缓冲区位置为%d\n", nextc,out); Sleep(sleepTime); } } int main() { pthread_t threadPool[producerNum+consumerNum]; int i; for(i = 0; i < producerNum; i++){ pthread_t temp; if(pthread_create(&temp, NULL, producer, NULL) == -1){ printf("ERROR, fail to create producer%d\n", i); exit(1); } threadPool[i] = temp; }//创建生产者进程放入线程池 for(i = 0; i < consumerNum; i++){ pthread_t temp; if(pthread_create(&temp, NULL, consumer, NULL) == -1){ printf("ERROR, fail to create consumer%d\n", i); exit(1); } threadPool[i+producerNum] = temp; }//创建消费者进程放入线程池 void * result; for(i = 0; i < producerNum+consumerNum; i++){ if(pthread_join(threadPool[i], &result) == -1){ printf("fail to recollect\n"); exit(1); } }//运行线程池 return 0; }
更多内容访问omegaxyz.com
网站所有代码采用Apache 2.0授权
网站文章采用知识共享许可协议BY-NC-SA4.0授权
© 2018 • OmegaXYZ-版权所有 转载请注明出处 -
Java实现Kafka生产者和消费者的示例
2021-01-05 10:06:08Java实现Kafka生产者和消费者的示例文章持续更新,微信搜索「万猫学社」第一时间阅读。
关注后回复「电子书」,免费获取12本Java必读技术书籍。Kafka简介
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。
方式一:kafka-clients
引入依赖
在pom.xml文件中,引入kafka-clients依赖:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.3.1</version> </dependency>
生产者
创建一个KafkaProducer的生产者实例:
@Configuration public class Config { public final static String bootstrapServers = "127.0.0.1:9092"; @Bean(destroyMethod = "close") public KafkaProducer<String, String> kafkaProducer() { Properties props = new Properties(); //设置Kafka服务器地址 props.put("bootstrap.servers", bootstrapServers); //设置数据key的序列化处理类 props.put("key.serializer", StringSerializer.class.getName()); //设置数据value的序列化处理类 props.put("value.serializer", StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<>(props); return producer; } }
在Controller中进行使用:
@RestController @Slf4j public class Controller { @Autowired private KafkaProducer<String, String> kafkaProducer; @RequestMapping("/kafkaClientsSend") public String send() { String uuid = UUID.randomUUID().toString(); RecordMetadata recordMetadata = null; try { //将消息发送到Kafka服务器的名称为“one-more-topic”的Topic中 recordMetadata = kafkaProducer.send(new ProducerRecord<>("one-more-topic", uuid)).get(); log.info("recordMetadata: {}", recordMetadata); log.info("uuid: {}", uuid); } catch (Exception e) { log.error("send fail, uuid: {}", uuid, e); } return uuid; } }
消费者
创建一个KafkaConsumer的消费者实例:
@Configuration public class Config { public final static String groupId = "kafka-clients-group"; public final static String bootstrapServers = "127.0.0.1:9092"; @Bean(destroyMethod = "close") public KafkaConsumer<String, String> kafkaConsumer() { Properties props = new Properties(); //设置Kafka服务器地址 props.put("bootstrap.servers", bootstrapServers); //设置消费组 props.put("group.id", groupId); //设置数据key的反序列化处理类 props.put("key.deserializer", StringDeserializer.class.getName()); //设置数据value的反序列化处理类 props.put("value.deserializer", StringDeserializer.class.getName()); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props); //订阅名称为“one-more-topic”的Topic的消息 kafkaConsumer.subscribe(Arrays.asList("one-more-topic")); return kafkaConsumer; } }
在Controller中进行使用:
@RestController @Slf4j public class Controller { @Autowired private KafkaConsumer<String, String> kafkaConsumer; @RequestMapping("/receive") public List<String> receive() { 从Kafka服务器中的名称为“one-more-topic”的Topic中消费消息 ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1)); List<String> messages = new ArrayList<>(records.count()); for (ConsumerRecord<String, String> record : records.records("one-more-topic")) { String message = record.value(); log.info("message: {}", message); messages.add(message); } return messages; } }
方式二:spring-kafka
使用kafka-clients需要我们自己创建生产者或者消费者的bean,如果我们的项目基于SpringBoot构建,那么使用spring-kafka就方便多了。
引入依赖
在pom.xml文件中,引入spring-kafka依赖:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.3.12.RELEASE</version> </dependency>
生产者
在application.yml文件中增加配置:
spring: kafka: #Kafka服务器地址 bootstrap-servers: 127.0.0.1:9092 producer: #设置数据value的序列化处理类 value-serializer: org.apache.kafka.common.serialization.StringSerializer
在Controller中注入KafkaTemplate就可以直接使用了,代码如下:
@RestController @Slf4j public class Controller { @Autowired private KafkaTemplate<String, String> template; @RequestMapping("/springKafkaSend") public String send() { String uuid = UUID.randomUUID().toString(); //将消息发送到Kafka服务器的名称为“one-more-topic”的Topic中 this.template.send("one-more-topic", uuid); log.info("uuid: {}", uuid); return uuid; } }
消费者
在application.yml文件中增加配置:
spring: kafka: #Kafka服务器地址 bootstrap-servers: 127.0.0.1:9092 consumer: #设置数据value的反序列化处理类 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
创建一个可以被Spring框架扫描到的类,并且在方法上加上@KafkaListener注解,就可以消费消息了,代码如下:
@Component @Slf4j public class Receiver { @KafkaListener(topics = "one-more-topic", groupId = "spring-kafka-group") public void listen(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { String message = (String) kafkaMessage.get(); log.info("message: {}", message); } } }
文章持续更新,微信搜索「万猫学社」第一时间阅读。
关注后回复「电子书」,免费获取12本Java必读技术书籍。 -
秒杀多线程第十篇 生产者消费者问题
2012-05-21 10:18:09生产者消费者问题是一个著名的线程同步问题,该问题描述如下:有一个生产者在生产产品,这些产品将提供给若干个消费者去消费,为了使生产者和消费者能并发执行,在两者之间设置一个具有多个缓冲区的缓冲池,生产者将...继经典线程同步问题之后,我们来看看生产者消费者问题及读者写者问题。生产者消费者问题是一个著名的线程同步问题,该问题描述如下:有一个生产者在生产产品,这些产品将提供给若干个消费者去消费,为了使生产者和消费者能并发执行,在两者之间设置一个具有多个缓冲区的缓冲池,生产者将它生产的产品放入一个缓冲区中,消费者可以从缓冲区中取走产品进行消费,显然生产者和消费者之间必须保持同步,即不允许消费者到一个空的缓冲区中取产品,也不允许生产者向一个已经放入产品的缓冲区中再次投放产品。
这个生产者消费者题目不仅常用于操作系统的课程设计,也常常在程序员和软件设计师考试中出现。并且在计算机考研的专业课考试中也是一个非常热门的问题。因此现在就针对这个问题进行详细深入的解答。
首先来简化问题,先假设生产者和消费者都只有一个,且缓冲区也只有一个。这样情况就简便多了。
第一.从缓冲区取出产品和向缓冲区投放产品必须是互斥进行的。可以用关键段和互斥量来完成。
第二.生产者要等待缓冲区为空,这样才可以投放产品,消费者要等待缓冲区不为空,这样才可以取出产品进行消费。并且由于有二个等待过程,所以要用二个事件或信号量来控制。
考虑这二点后,代码很容易写出来。另外为了美观起见,将消费者的输出颜色设置为彩色,有关如何在控制台下设置彩色输出请参阅《VC 控制台颜色设置》。
//1生产者 1消费者 1缓冲区 //使用二个事件,一个表示缓冲区空,一个表示缓冲区满。 //再使用一个关键段来控制缓冲区的访问 #include <stdio.h> #include <process.h> #include <windows.h> //设置控制台输出颜色 BOOL SetConsoleColor(WORD wAttributes) { HANDLE hConsole = GetStdHandle(STD_OUTPUT_HANDLE); if (hConsole == INVALID_HANDLE_VALUE) return FALSE; return SetConsoleTextAttribute(hConsole, wAttributes); } const int END_PRODUCE_NUMBER = 10; //生产产品个数 int g_Buffer; //缓冲区 //事件与关键段 CRITICAL_SECTION g_cs; HANDLE g_hEventBufferEmpty, g_hEventBufferFull; //生产者线程函数 unsigned int __stdcall ProducerThreadFun(PVOID pM) { for (int i = 1; i <= END_PRODUCE_NUMBER; i++) { //等待缓冲区为空 WaitForSingleObject(g_hEventBufferEmpty, INFINITE); //互斥的访问缓冲区 EnterCriticalSection(&g_cs); g_Buffer = i; printf("生产者将数据%d放入缓冲区\n", i); LeaveCriticalSection(&g_cs); //通知缓冲区有新数据了 SetEvent(g_hEventBufferFull); } return 0; } //消费者线程函数 unsigned int __stdcall ConsumerThreadFun(PVOID pM) { volatile bool flag = true; while (flag) { //等待缓冲区中有数据 WaitForSingleObject(g_hEventBufferFull, INFINITE); //互斥的访问缓冲区 EnterCriticalSection(&g_cs); SetConsoleColor(FOREGROUND_GREEN); printf(" 消费者从缓冲区中取数据%d\n", g_Buffer); SetConsoleColor(FOREGROUND_RED | FOREGROUND_GREEN | FOREGROUND_BLUE); if (g_Buffer == END_PRODUCE_NUMBER) flag = false; LeaveCriticalSection(&g_cs); //通知缓冲区已为空 SetEvent(g_hEventBufferEmpty); Sleep(10); //some other work should to do } return 0; } int main() { printf(" 生产者消费者问题 1生产者 1消费者 1缓冲区\n"); printf(" -- by MoreWindows( http://blog.csdn.net/MoreWindows ) --\n\n"); InitializeCriticalSection(&g_cs); //创建二个自动复位事件,一个表示缓冲区是否为空,另一个表示缓冲区是否已经处理 g_hEventBufferEmpty = CreateEvent(NULL, FALSE, TRUE, NULL); g_hEventBufferFull = CreateEvent(NULL, FALSE, FALSE, NULL); const int THREADNUM = 2; HANDLE hThread[THREADNUM]; hThread[0] = (HANDLE)_beginthreadex(NULL, 0, ProducerThreadFun, NULL, 0, NULL); hThread[1] = (HANDLE)_beginthreadex(NULL, 0, ConsumerThreadFun, NULL, 0, NULL); WaitForMultipleObjects(THREADNUM, hThread, TRUE, INFINITE); CloseHandle(hThread[0]); CloseHandle(hThread[1]); //销毁事件和关键段 CloseHandle(g_hEventBufferEmpty); CloseHandle(g_hEventBufferFull); DeleteCriticalSection(&g_cs); return 0; }
运行结果如下所示:
可以看出生产者与消费者已经是有序的工作了。
然后再对这个简单生产者消费者问题加大难度。将消费者改成2个,缓冲池改成拥有4个缓冲区的大缓冲池。
如何来思考了这个问题了?首先根据上面分析的二点,可以知道生产者和消费者由一个变成多个的影响不大,唯一要注意的是缓冲池变大了,回顾一下《秒杀多线程第八篇 经典线程同步 信号量Semaphore》中的信号量,不难得出用二个信号量就可以解决这种缓冲池有多个缓冲区的情况——用一个信号量A来记录为空的缓冲区个数,另一个信号量B记录非空的缓冲区个数,然后生产者等待信号量A,消费者等待信号量B就可以了。因此可以仿照上面的代码来实现复杂生产者消费者问题,示例代码如下:
//1生产者 2消费者 4缓冲区 #include <stdio.h> #include <process.h> #include <windows.h> //设置控制台输出颜色 BOOL SetConsoleColor(WORD wAttributes) { HANDLE hConsole = GetStdHandle(STD_OUTPUT_HANDLE); if (hConsole == INVALID_HANDLE_VALUE) return FALSE; return SetConsoleTextAttribute(hConsole, wAttributes); } const int END_PRODUCE_NUMBER = 8; //生产产品个数 const int BUFFER_SIZE = 4; //缓冲区个数 int g_Buffer[BUFFER_SIZE]; //缓冲池 int g_i, g_j; //信号量与关键段 CRITICAL_SECTION g_cs; HANDLE g_hSemaphoreBufferEmpty, g_hSemaphoreBufferFull; //生产者线程函数 unsigned int __stdcall ProducerThreadFun(PVOID pM) { for (int i = 1; i <= END_PRODUCE_NUMBER; i++) { //等待有空的缓冲区出现 WaitForSingleObject(g_hSemaphoreBufferEmpty, INFINITE); //互斥的访问缓冲区 EnterCriticalSection(&g_cs); g_Buffer[g_i] = i; printf("生产者在缓冲池第%d个缓冲区中投放数据%d\n", g_i, g_Buffer[g_i]); g_i = (g_i + 1) % BUFFER_SIZE; LeaveCriticalSection(&g_cs); //通知消费者有新数据了 ReleaseSemaphore(g_hSemaphoreBufferFull, 1, NULL); } printf("生产者完成任务,线程结束运行\n"); return 0; } //消费者线程函数 unsigned int __stdcall ConsumerThreadFun(PVOID pM) { while (true) { //等待非空的缓冲区出现 WaitForSingleObject(g_hSemaphoreBufferFull, INFINITE); //互斥的访问缓冲区 EnterCriticalSection(&g_cs); SetConsoleColor(FOREGROUND_GREEN); printf(" 编号为%d的消费者从缓冲池中第%d个缓冲区取出数据%d\n", GetCurrentThreadId(), g_j, g_Buffer[g_j]); SetConsoleColor(FOREGROUND_RED | FOREGROUND_GREEN | FOREGROUND_BLUE); if (g_Buffer[g_j] == END_PRODUCE_NUMBER)//结束标志 { LeaveCriticalSection(&g_cs); //通知其它消费者有新数据了(结束标志) ReleaseSemaphore(g_hSemaphoreBufferFull, 1, NULL); break; } g_j = (g_j + 1) % BUFFER_SIZE; LeaveCriticalSection(&g_cs); Sleep(50); //some other work to do ReleaseSemaphore(g_hSemaphoreBufferEmpty, 1, NULL); } SetConsoleColor(FOREGROUND_GREEN); printf(" 编号为%d的消费者收到通知,线程结束运行\n", GetCurrentThreadId()); SetConsoleColor(FOREGROUND_RED | FOREGROUND_GREEN | FOREGROUND_BLUE); return 0; } int main() { printf(" 生产者消费者问题 1生产者 2消费者 4缓冲区\n"); printf(" -- by MoreWindows( http://blog.csdn.net/MoreWindows ) --\n\n"); InitializeCriticalSection(&g_cs); //初始化信号量,一个记录有产品的缓冲区个数,另一个记录空缓冲区个数. g_hSemaphoreBufferEmpty = CreateSemaphore(NULL, 4, 4, NULL); g_hSemaphoreBufferFull = CreateSemaphore(NULL, 0, 4, NULL); g_i = 0; g_j = 0; memset(g_Buffer, 0, sizeof(g_Buffer)); const int THREADNUM = 3; HANDLE hThread[THREADNUM]; //生产者线程 hThread[0] = (HANDLE)_beginthreadex(NULL, 0, ProducerThreadFun, NULL, 0, NULL); //消费者线程 hThread[1] = (HANDLE)_beginthreadex(NULL, 0, ConsumerThreadFun, NULL, 0, NULL); hThread[2] = (HANDLE)_beginthreadex(NULL, 0, ConsumerThreadFun, NULL, 0, NULL); WaitForMultipleObjects(THREADNUM, hThread, TRUE, INFINITE); for (int i = 0; i < THREADNUM; i++) CloseHandle(hThread[i]); //销毁信号量和关键段 CloseHandle(g_hSemaphoreBufferEmpty); CloseHandle(g_hSemaphoreBufferFull); DeleteCriticalSection(&g_cs); return 0; }
运行结果如下图所示:
输出结果证明各线程的同步和互斥已经完成了。
至此,生产者消费者问题已经圆满的解决了,下面作个总结:
1.首先要考虑生产者与消费者对缓冲区操作时的互斥。
2.不管生产者与消费者有多少个,缓冲池有多少个缓冲区。都只有二个同步过程——分别是生产者要等待有空缓冲区才能投放产品,消费者要等待有非空缓冲区才能去取产品。
下一篇《秒杀多线程第十一篇读者写者问题》将介绍另一个著名的同步问题——读者写者问题,欢迎大家再来参阅。
转载请标明出处,原文地址:http://blog.csdn.net/morewindows/article/details/7577591
如果觉得本文对您有帮助,请点击‘顶’支持一下,您的支持是我写作最大的动力,谢谢。
-
Java多种方式解决生产者消费者问题(十分详细)
2018-08-16 08:40:50生产者消费者问题 一、问题描述 生产者消费者问题(Producer-consumer problem),也称有限缓冲问题(Bounded-buffer problem),是一个多线程同步问题的经典案例。生产者生成一定量的数据放到缓冲区中,... -
Java多线程技术~生产者和消费者问题
2020-06-16 14:15:18Java多线程技术~生产者和消费者问题 本文是上一篇文章的后续,详情点击该连接 线程通信 应用场景:生产者和消费者问题 假设仓库中只能存放一件产品,生产者将生产出来的... -
Java实现生产者和消费者模式
2019-10-26 18:39:07从生产者、消费者模型会出现的问题出发,谈了一下对生产者、消费者模型的理解,并配有完整的代码实现。 -
信号量与生产者消费者问题
2017-01-19 15:06:44生产者—消费者问题 生产者—消费者题型在各类考试(考研、程序员证书、程序员面试笔试、期末考试)很常见,原因之一是生产者—消费者题型在实际的并发程序(多进程、多线程)设计中很常见;之二是这种题型综合性较... -
生产者消费者问题 C++实现
2018-06-29 17:00:03生产者消费者问题 C++实现 知识准备 thread 介绍 成员类 成员函数 sleep_for 介绍 mutex 介绍 成员函数 unique_lock 介绍 成员函数 codition_variable 介绍 成员函数 代码示例 生产者消费者问题 ... -
生产者消费者模式
2018-09-06 23:21:22生产者消费者模式 -
生产者与消费者模型
2020-09-26 18:40:01生产者-消费者模式是一个经典的多线程设计模式。 在生产者-消费者模式中,通常有两类线程,即若干个生产者和消费者线程。 生产者线程负责提交用户请求 消费者线程负责处理生产者提交的任务。 内存缓冲区 缓存生产者... -
实现生产者消费者的三种方式
2019-10-14 21:22:03文章目录wait/notify的消息通知机制预备知识wait/notify消息通知潜在的一些问题notify过早通知等待wait的条件发生变化假死状态wait/notifyAll实现生产者-消费者使用Lock中Condition的await/signalAll实现生产者-消费... -
生产者消费者模型
2017-02-20 23:33:26一、什么是生产者消费者模型 在实际的开发中,经常会碰到如下场景:某个模块负责生产数据,这些数据由另一个模块来负责处理。产生数据的模块就形象的称为生产者,而处理数据的模块就称为消费者。只有生产者和消费... -
java 生产者消费者模式
2017-04-27 15:27:01java的生产者消费者模式,有三个部分组成,一个是生产者,一个是消费者,一个是缓存。 这么做有什么好处呢? 1.解耦(去依赖),如果是消费者直接调用生产者,那如果生产者的代码变动了,消费者的代码也需要随之变动 ... -
生产者/消费者模式的理解及实现
2018-05-31 10:05:37★简介 生产者消费者模式并不是GOF提出的23种设计模式之一,23种设计模式都是建立在面向对象的基础之上的,但其实面向过程的编程中也有很多高效的编程模式,生产者消费者模式便是其中之一,它是我们编程过程中最... -
生产者消费者问题
2019-09-03 23:19:34临界区:对临界资源进行访问的那段代码称为临界区。为了互斥访问临界资源,每个进程在进入临界区...使用一个容器来存放物品,只有当容器中还有空位置时,生产者才可以生产商品;只有当容器中物品数量大于0时,消费者... -
生产者-消费者问题
2019-07-17 19:39:45生产者消费者问题是研究多线程程序时绕不开的经典问题之一,它描述是有一块缓冲区作为仓库,生产者可以将产品放入仓库,消费者则可以从仓库中取走产品。解决生产者 / 消费者问题的方法可分为两类:(1)采用某种机制... -
PV操作案例(一)——生产者、消费者;多生产者、多消费者;吸烟者问题
2020-03-27 12:19:13文章目录生产者、消费者问题描述PV操作题目分析步骤多生产者、多消费者问题描述问题分析吸烟者问题描述问题分析 生产者、消费者问题描述 系统中有一组生产者进程和一组消费者进程,生产者进程每次生成一个产品放入... -
java实现生产者和消费者
2019-06-19 11:29:10本篇博文主要介绍如何使用java来实现简单的生产者和消费者 来帮助大家简单的入门理解生产者和消费者模式 一、对生产者消费者的理解 生产者消费者模式是并发、多线程编程中经典的设计模式。 简单来看,就是一个类... -
Java——生产者-消费者问题(GUI)
2020-05-01 19:02:43设计一个模拟仿真“生产者-消费者”问题的解决过程及方法的程序 -
使用Condition实现生产者消费者
2018-11-10 11:25:21使用Condtion实现生产者消费者可以精确控制唤醒生产者还是消费者线程 与synchronized的等待唤醒机制相比Condition具有更多的灵活性以及精确性,这是因为notify()在唤醒线程时是随机(同一个锁),而Condition则可通过... -
生产者消费者C语言
2019-04-30 21:27:05操作系统生产者消费者问题C语言 #include <stdio.h> #include <pthread.h> #include <windows.h> #define N 100 #define true 1 #define producerNum 15 #define consumerN... -
Kafka消费者生产者实例
2017-07-30 18:22:56为了更为直观展示Kafka的消息生产消费的过程,我会从基于Console和基于Application两个方面介绍使用实例。...由于主要介绍如何使用Kafka快速构建生产者消费者实例,所以不会涉及Kafka内部的原理。一个基于Kafk -
深入分析Kafka生产者和消费者
2020-10-29 22:43:00深入Kafka生产者和消费者Kafka生产者消息发送的流程发送方式生产者属性配置序列化器分区器Kafka消费者消费者群组消费者属性配置消费者基础概念消费者核心概念 Kafka生产者 消息发送的流程 生产者每发送一条消息需要... -
生产者消费者 & 读者写者
2016-07-17 20:50:51消费者生产者 读者写者 -
生产者与消费者分析
2018-08-04 10:06:53众所周知大型网站都是并发支持,用到最多的也是生产者与消费者模式。 生产消费者模型 生产者消费者模型具体来讲,就是在一个系统中,存在生产者和消费者两种角色,他们通过内存缓冲区进行通信,生产者生产消费者...