-
2018-08-05 17:22:28
抄自维基百科 :
生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。
本文用一个ItemRepository类表示产品仓库,其中包含一个数组和两个坐标表示的环形队列、一个std::mutex成员、用来保证每次只被一个线程读写操作 (为了保证打印出来的消息是一行一行的,在它空闲的时候也借用的这个互斥量╮(╯▽╰)╭)、两个std::condition_variable表示队列不满和不空的状态,进而保证生产的时候不满,消耗的时候不空。
#pragma once #include <chrono>//std::chrono #include <mutex>//std::mutex,std::unique_lock,std::lock_guard #include <thread>//std::thread #include <condition_variable>//std::condition_variable #include <iostream>//std::cout,std::endl #include <map>//std::map namespace MyProducerToConsumer { static const int gRepositorySize = 10;//total size of the repository static const int gItemNum = 97;//number of products to produce std::mutex produce_mtx, consume_mtx;//mutex for all the producer thread or consumer thread std::map<std::thread::id, int> threadPerformance;//records of every thread's producing/consuming number struct ItemRepository {//repository class int m_ItemBuffer[gRepositorySize];//Repository itself (as a circular queue) int m_ProducePos;//rear position of circular queue int m_ConsumePos;//head position of circular queue std::mutex m_mtx;//mutex for operating the repository std::condition_variable m_RepoUnfull;//indicating that this repository is unfull(then producers can produce items) std::condition_variable m_RepoUnempty;//indicating that this repository is unempty(then consumers can produce items) }gItemRepo; void ProduceItem(ItemRepository *ir, int item) { std::unique_lock <std::mutex>ulk(ir->m_mtx); while ((ir->m_ProducePos + 1) % gRepositorySize == ir->m_ConsumePos) {//full(spare one slot for indicating) std::cout << "Reposity is full. Waiting for consumers..." << std::endl; ir->m_RepoUnfull.wait(ulk);//unlocking ulk and waiting for unfull condition } //when unfull ir->m_ItemBuffer[ir->m_ProducePos++] = item;//procude and shift std::cout << "Item No." << item << " produced successfully by " <<std::this_thread::get_id()<<"!" << std::endl; threadPerformance[std::this_thread::get_id()]++; if (ir->m_ProducePos == gRepositorySize)//loop ir->m_ProducePos = 0; ir->m_RepoUnempty.notify_all();//item produced, so it's unempty; notify all consumers } int ConsumeItem(ItemRepository *ir) { std::unique_lock<std::mutex>ulk(ir->m_mtx); while (ir->m_ConsumePos == ir->m_ProducePos) {//empty std::cout << "Repository is empty.Waiting for producing..." << std::endl; ir->m_RepoUnempty.wait(ulk); } int item = ir->m_ItemBuffer[ir->m_ConsumePos++]; std::cout << "Item No." << item << " consumed successfully by " <<std::this_thread::get_id()<<"!" << std::endl; threadPerformance[std::this_thread::get_id()]++; if (ir->m_ConsumePos == gRepositorySize) ir->m_ConsumePos = 0; ir->m_RepoUnfull.notify_all();//item consumed, so it's unempty; notify all consumers return item; } void ProducerThread() { static int produced = 0;//static variable to indicate the number of produced items while (1) { std::this_thread::sleep_for(std::chrono::milliseconds(10));//sleep long enough in case it runs too fast for other threads to procude std::lock_guard<std::mutex>lck(produce_mtx);//auto unlock when break produced++; if (produced > gItemNum)break; gItemRepo.m_mtx.lock(); std::cout << "Producing item No." << produced << "..." << std::endl; gItemRepo.m_mtx.unlock(); ProduceItem(&gItemRepo, produced); } gItemRepo.m_mtx.lock(); std::cout << "Producer thread " << std::this_thread::get_id() << " exited." << std::endl; gItemRepo.m_mtx.unlock(); } void ConsumerThread() { static int consumed = 0; while (1) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); std::lock_guard<std::mutex>lck(consume_mtx); consumed++; if (consumed > gItemNum)break; gItemRepo.m_mtx.lock(); std::cout << "Consuming item available..." << std::endl; gItemRepo.m_mtx.unlock(); ConsumeItem(&gItemRepo); } gItemRepo.m_mtx.lock(); std::cout << "Consumer thread " << std::this_thread::get_id() << " exited." << std::endl; gItemRepo.m_mtx.unlock(); } void InitItemRepository(ItemRepository* ir) { ir->m_ConsumePos = 0; ir->m_ProducePos = 0; } void Run() { InitItemRepository(&gItemRepo); std::thread thdConsume[11]; std::thread thdProduce[11]; for (auto& t : thdConsume)t = std::thread(ConsumerThread); for (auto& t : thdProduce)t = std::thread(ProducerThread); for (auto& t : thdConsume)t.join(); for (auto& t : thdProduce)t.join(); for (auto& iter : threadPerformance)cout << iter.first << ":" << iter.second << endl; } }
更多相关内容 -
多进程同步方法解决生产者-消费者问题(linux线程实现)
2021-05-15 04:00:14设计要求:(1)每个生产者和消费者对有界缓冲区进行操作后,即时显示有界缓冲区的全部内容,当前指针位置和生产者/消费者线程的标识符.(2)生产者和消费者各有两个以上.(3)多个生产者或多个消费者之间须有共享对缓冲区... -
生产者消费者问题 C++实现
2018-06-29 17:00:03生产者消费者问题 C++实现 知识准备 thread 介绍 成员类 成员函数 sleep_for 介绍 mutex 介绍 成员函数 unique_lock 介绍 成员函数 codition_variable 介绍 成员函数 代码示例 生产者消费者问题 ...文章目录
生产者消费者问题 C++实现
知识准备
thread
介绍
- 定位于头文件的class thread
- 表示单个执行线程, 没有两个thread对象会表示同一个线程
- 不可复制构造
- 不可复制赋值
成员类
成员函数
-
get_id
- 返回线程的id
-
hardware_concurrency
- 返回实现支持的并发线程数
-
join
- 等待线程完成其执行
sleep_for
介绍
-
定义域头文件
-
声明
-
template< class Rep, class Period > void sleep_for( const std::chrono::duration<Rep, Period>& sleep_duration );
-
sleep_duration : 要睡眠的时长
-
mutex
介绍
-
定义于头文件
-
mutex
提供排他性非递归所有权语义:
成员函数
-
lock
- 锁定互斥, 若互斥不可用则堵塞
-
unlock
- 解锁互斥
unique_lock
介绍
-
定义于
-
声明
template< class Mutex > class unique_lock;
成员函数
- lock
- 锁定关联互斥
- unlock
- 解锁关联互斥
- mutex
- 返回指向关联互斥的指针
codition_variable
介绍
- 定义于头文件 <condition_variable>
- 声明
class condition_variable;
- 有意修改变量的线程必须:
-
- 获得std:: mutex(通过std::unique_lock)
- 在保有锁时进行修改
- 执行 notify_one 或 notify_all
-
- 任何有意在
std::condition_variable
上等待的线程必须-
- 获得
std::unique_lock <std::mutex>
- 执行 wait 、 wait_for 或 wait_until ,等待操作自动释放互斥,并悬挂线程的执行
- 线程被唤醒,且自动重获得互斥
- 获得
-
- **
std::condition_variable
只可与std::unique_lock<std::mutex>
一同使用;
成员函数
-
notify_one
- 通知一个等待线程
-
notify_all
- 通知所有等待线程
-
wait
- 阻塞当前进程, 直至被唤醒
-
wait_for
- 阻塞当前线程,直到条件变量被唤醒,或到指定时限时长后
- wait_until
-
阻塞当前线程,直到条件变量被唤醒,或直到抵达指定时间点
-
代码示例
// operator_system.cpp: 定义控制台应用程序的入口点。 // #include "stdafx.h" #include<iostream> #include <mutex> #include <condition_variable> #include <windows.h> #include <thread> using namespace std; static const int buffer_size = 10; // 缓存大小 static const int item_total = 100; //总共要生产 item_total个item // 缓存结构体, 使用循环队列当做缓存 struct Buffer { int buffer[buffer_size]; size_t read_position; // 当前读位置 size_t write_position; // 当前写位置 mutex mtx; // 读写互斥 //条件变量 condition_variable not_full; condition_variable not_empty; }buffer_res; typedef struct Buffer Buffer; void porduce_item(Buffer *b, int item) { unique_lock<mutex> lock(b->mtx);//设置互斥锁 while(((b->write_position + 1) % buffer_size) == b->read_position) { //当前缓存已经满了 cout << "buffer is full now, producer is wating....." << endl; (b->not_full).wait(lock); // 等待缓存非full } // 向缓存中添加item (b->buffer)[b->write_position] = item; (b->write_position)++; // 若到达最后一个, 写位置置位0 if (b->write_position == buffer_size) b->write_position = 0; (b->not_empty).notify_all(); lock.unlock(); } int consume_item(Buffer *b) { int data; unique_lock <mutex> lock(b->mtx); while (b->write_position == b->read_position) { // 当前buffer 为空 cout << "buffer is empty , consumer is waiting....." << endl; (b->not_empty).wait(lock); } data = (b->buffer)[b->read_position]; (b->read_position)++; if (b->read_position >= buffer_size) b->read_position = 0; (b->not_full).notify_all(); lock.unlock(); return data; } //生产者任务 void producer() { for (int i = 1; i<= item_total;i++) { cout << "prodece the " << i << "^th item ..." << endl; porduce_item(&buffer_res, i); } } //消费者任务 void consumer() { static int cnt = 0; while(1) { Sleep(1); int item = consume_item(&buffer_res); cout << "consume the " << item << "^th item" << endl; if (++cnt == item_total) break; } } //初始化 buffer void init_buffer(Buffer *b) { b->write_position = 0; b->read_position = 0; } int main() { init_buffer(&buffer_res); thread prodece(producer); thread consume(consumer); prodece.join(); consume.join(); getchar(); }
查看其他精彩文章
深入分析HashMap
AOP核心原理和SpringAOP
10分钟入门SpringAOP -
生产者消费者问题c++实现
2009-06-23 08:55:57生产者消费者经典问题,这个程序选择用c++实现。希望我们能够分享。。。。。 -
生产者消费者问题C++实现
2020-10-11 11:25:33生产者消费者问题C++实现 2020/10/11 11:23 问题 设计C/C++程序(可以嵌入汇编语言),以忙等待方式实现信号量及其P、V操作。利用你实现的信号量,实现生产者-消费者问题。给出实现方法、主要源代码和测试结果。 代码 ...生产者消费者问题C++实现
2020/10/11 11:23
问题
设计C/C++程序(可以嵌入汇编语言),以忙等待方式实现信号量及其P、V操作。利用你实现的信号量,实现生产者-消费者问题。给出实现方法、主要源代码和测试结果。
代码
# include<iostream> # include<thread> # include<vector> # include<mutex> # include<condition_variable> # include<queue> #include <ctime> #include <windows.h> using namespace std; # define PRODUCT_SIZE 5//生产者数量 # define CUSTOMER_SIZE 20//消费者数量 #define MAX_SIZE 10//最大产品数量 mutex mut;//互斥锁 condition_variable con;//条件变量 queue<int> que;//队列,模拟缓冲区 void Producter() { while (true) { Sleep(10); srand(int (time(NULL))); std::unique_lock <std::mutex> lck(mut); /*当 std::condition_variable对象的某个wait 函数被调用的时候, 它使用 std::unique_lock(通过 std::mutex) 来锁住当前线程。 当前线程会一直被阻塞,直到另外一个线程在相同的 std::condition_variable 对象上调用*/ while (que.size() > MAX_SIZE)//当队列已满时等待,不再生产 { con.wait(lck);//P操作 } int data = rand();//随机产生数字代表商品的生产 que.push(data);//将数据推入队列代表商品加入缓冲区 cout << this_thread::get_id() << "生产了产品:" << data << endl; Sleep(500); con.notify_all();//唤醒所有等待的进程,即V操作 } } void Customer() { while (true) { std::unique_lock <std::mutex> lck(mut); while (que.empty())//当队列为空时等待,不再消费 { //用condition_variable对象实现对缓冲区的互斥操作 con.wait(lck);//P操作 } cout << this_thread::get_id() << "消费了产品:" << que.front() << endl; Sleep(500); que.pop(); con.notify_all();//唤醒所有等待的进程,即V操作 } } int main() { vector<thread> threadPoll; //创建生产者和消费者 for (int i = 0; i < PRODUCT_SIZE; ++i) { threadPoll.push_back(thread(Producter)); } for (int i = 0; i < PRODUCT_SIZE + CUSTOMER_SIZE; ++i) { threadPoll.push_back(thread(Customer)); } for (int i = 0; i < PRODUCT_SIZE + CUSTOMER_SIZE; ++i) { threadPoll[i].join();//原始线程等到新线程执行完毕后再销毁 } return 0; }
运行截图
1.利用条件变量condition_variable实现PV操作,当condition_variable对象的某个wait 函数被调用的时候,它使用 unique_lock(通过 mutex) 来锁住当前线程。当前线程会一直被阻塞,直到另外一个线程在相同的condition_variable 对象上调用
2.用产生的随机数字代表商品的生产,加入队列表示加入缓冲区
3.线程销毁采用join方式,等待新线程执行完毕再销毁原始线程 -
生产者消费者代码(C++版)
2015-02-13 12:58:42用Posix信号量, Posix互斥量, 解决生产者消费者问题(c++版) -
生产者消费者问题 C++
2012-02-23 09:29:42对于生产者消费者的问题,一个简单的处理。 -
生产者消费者的c++代码实现
2011-11-29 23:03:07计算机操作系统经典的生产者消费者问题c++高级语言的实现。编程入门必备。 -
生产者-消费者问题.cpp
2020-03-26 16:31:15生产者——消费者问题实际上是相互合作进程关系的一种抽象。该类问题不允许消费者进程到一个空缓冲区中取产品,同时也不允许生产者进程到一个已满且还没被取走的缓冲区中投放产品。 使用一个数组来表示具有n个(0,1... -
多进程同步解决生产者消费者问题(c++源码)
2012-01-08 11:23:35用多进程同步方法解决生产者—消费者问题(c++源码) 1、每个生产者和消费者对有界缓冲区进行操作后,即时显示有界缓冲区的全部内容,当前指针位置和生产者/消费者进程的标识符。 2、生产者和消费者各有两个以上。 3... -
生产者-消费者问题的模拟实现(课设含源代码).doc
2021-03-03 19:14:03用进程同步方法解决“生产者-消费者”问题,C或C++语言实现。 1、设计目的 通过研究进程并发和信号量机制,实现生产者-消费者问题的并发控制。 2、设计要求 1)每个生产者和消费者对有界缓冲区进行操作后,即时显示... -
Qt C++11 生产者消费者模式类
2017-07-11 21:56:15使用Qt 和 C++11 的std::mutex 和 std::condition_variable 实现一个演示生产者消费者模式的Qt工程。 -
多生产者与多消费者问题c++源码
2010-01-05 07:57:33多生产者,多消费者问题源代码多生产者,多消费者问题源代码多生产者,多消费者问题源代码多生产者,多消费者问题源代码多生产者,多消费者问题源代码多生产者,多消费者问题源代码多生产者,多消费者问题源代码多... -
C++实现生产者和消费者模型
2020-09-17 08:50:04C++实现生产者和消费者模型 C++实现生产者和消费者模型1、实现细节1、单生产者-单消费者模型参考 C++实现生产者和消费者模型 1、实现细节 具体的实现逻辑是构建一个queue来存储生产的数据,queue不满时可以生产,不...C++实现生产者和消费者模型
1、实现细节
- 具体的实现逻辑是构建一个queue来存储生产的数据,queue不满时可以生产,不空时可以消费。
- 对于这个队列,采用阻塞队列的实现思路。
- 先实现构造函数,初始化一个unique_lock供condition_variable使用。
- 如何在类里面使用unique_lock等需要初始化,并且初始化会加锁的对象。这要研究下。我的理解是构造列表初始化,然后函数体里unlock。
- 对于条件变量,申请两个,分别控制consumer和producer。
- 然后就是入和出队列的细节。
- 首先加锁。
- 循环判断一下目前的队列情况,对于各自的特殊情况(队满和队空)进行处理。
- 唤醒一个线程来处理特殊情况。
- 等待处理完毕。
- 处理入和出队列操作。
- 最后释放锁。
2、单生产者-单消费者模型
- 单生产者-单消费者模型中只有一个生产者和一个消费者,
- 生产者不停地往产品库中放入产品,
- 消费者则从产品库中取走产品,
- 产品库容积有限制,只能容纳一定数目的产品,
- 如果生产者生产产品的速度过快,则需要等待消费者取走产品之后,产品库不为空才能继续往产品库中放置新的产品,
- 相反,如果消费者取走产品的速度过快,则可能面临产品库中没有产品可使用的情况,此时需要等待生产者放入一个产品后,消费者才能继续工作。
C++11实现单生产者单消费者模型的代码如下:
#include <unistd.h> #include <cstdlib> #include <condition_variable> #include <iostream> #include <mutex> #include <thread> static const int bufSize = 10; // Item buffer size. static const int ProNum = 20; // How many items we plan to produce. struct resource { int buf[bufSize]; // 产品缓冲区, 配合 read_pos 和 write_pos 模型环形队列. size_t read_pos; // 消费者读取产品位置. size_t write_pos; // 生产者写入产品位置. std::mutex mtx; // 互斥量,保护产品缓冲区 std::condition_variable not_full; // 条件变量, 指示产品缓冲区不为满. std::condition_variable not_empty; // 条件变量, 指示产品缓冲区不为空. } instance; // 产品库全局变量, 生产者和消费者操作该变量. typedef struct resource resource; void Producer(resource *ir, int item) { std::unique_lock<std::mutex> lock(ir->mtx); while (((ir->write_pos + 1) % bufSize) == ir->read_pos) { // item buffer is full, just wait here. std::cout << "Producer is waiting for an empty slot...\n"; (ir->not_full).wait(lock); // 生产者等待"产品库缓冲区不为满"这一条件发生. } (ir->buf)[ir->write_pos] = item; // 写入产品. (ir->write_pos)++; // 写入位置后移. if (ir->write_pos == bufSize) // 写入位置若是在队列最后则重新设置为初始位置. ir->write_pos = 0; (ir->not_empty).notify_all(); // 通知消费者产品库不为空. } int Consumer(resource *ir) { int data; std::unique_lock<std::mutex> lock(ir->mtx); // item buffer is empty, just wait here. while (ir->write_pos == ir->read_pos) { std::cout << "Consumer is waiting for items...\n"; (ir->not_empty).wait(lock); // 消费者等待"产品库缓冲区不为空"这一条件发生. } data = (ir->buf)[ir->read_pos]; // 读取某一产品 (ir->read_pos)++; // 读取位置后移 if (ir->read_pos >= bufSize) // 读取位置若移到最后,则重新置位. ir->read_pos = 0; (ir->not_full).notify_all(); // 通知消费者产品库不为满. return data; // 返回产品. } void ProducerTask() // 生产者任务 { for (int i = 1; i <= ProNum; ++i) { // sleep(1); std::cout << "Produce the " << i << "^th item..." << std::endl; Producer(&instance, i); // 循环生产 ProNum 个产品. } } void ConsumerTask() // 消费者任务 { static int cnt = 0; while (1) { sleep(1); int item = Consumer(&instance); // 消费一个产品. std::cout << "Consume the " << item << "^th item" << std::endl; if (++cnt == ProNum) break; // 如果产品消费个数为 ProNum, 则退出. } } void Initresource(resource *ir) { ir->write_pos = 0; // 初始化产品写入位置. ir->read_pos = 0; // 初始化产品读取位置. } int main() { Initresource(&instance); std::thread producer(ProducerTask); // 创建生产者线程. std::thread consumer(ConsumerTask); // 创建消费之线程. producer.join(); consumer.join(); }
3、单生产者-多消费者模型
与单生产者和单消费者模型不同的是,单生产者-多消费者模型中可以允许多个消费者同时从产品库中取走产品。所以除了保护产品库在多个读写线程下互斥之外,还需要维护消费者取走产品的计数器,代码如下:
#include <unistd.h> #include <cstdlib> #include <condition_variable> #include <iostream> #include <mutex> #include <thread> static const int bufSize = 8; // Item buffer size. static const int ProNum = 30; // How many items we plan to produce. struct resource { int buf[bufSize]; // 产品缓冲区, 配合 read_pos 和 write_pos 模型环形队列. size_t read_pos; // 消费者读取产品位置. size_t write_pos; // 生产者写入产品位置. size_t item_counter; std::mutex mtx; // 互斥量,保护产品缓冲区 std::mutex item_counter_mtx; std::condition_variable not_full; // 条件变量, 指示产品缓冲区不为满. std::condition_variable not_empty; // 条件变量, 指示产品缓冲区不为空. } instance; // 产品库全局变量, 生产者和消费者操作该变量. typedef struct resource resource; void Producer(resource *ir, int item) { std::unique_lock<std::mutex> lock(ir->mtx); while (((ir->write_pos + 1) % bufSize) == ir->read_pos) { // item buffer is full, just wait here. std::cout << "Producer is waiting for an empty slot...\n"; (ir->not_full).wait(lock); // 生产者等待"产品库缓冲区不为满"这一条件发生. } (ir->buf)[ir->write_pos] = item; // 写入产品. (ir->write_pos)++; // 写入位置后移. if (ir->write_pos == bufSize) // 写入位置若是在队列最后则重新设置为初始位置. ir->write_pos = 0; (ir->not_empty).notify_all(); // 通知消费者产品库不为空. lock.unlock(); // 解锁. } int Consumer(resource *ir) { int data; std::unique_lock<std::mutex> lock(ir->mtx); // item buffer is empty, just wait here. while (ir->write_pos == ir->read_pos) { std::cout << "Consumer is waiting for items...\n"; (ir->not_empty).wait(lock); // 消费者等待"产品库缓冲区不为空"这一条件发生. } data = (ir->buf)[ir->read_pos]; // 读取某一产品 (ir->read_pos)++; // 读取位置后移 if (ir->read_pos >= bufSize) // 读取位置若移到最后,则重新置位. ir->read_pos = 0; (ir->not_full).notify_all(); // 通知消费者产品库不为满. lock.unlock(); // 解锁. return data; // 返回产品. } void ProducerTask() // 生产者任务 { for (int i = 1; i <= ProNum; ++i) { // sleep(1); std::cout << "Producer thread " << std::this_thread::get_id() << " producing the " << i << "^th item..." << std::endl; Producer(&instance, i); // 循环生产 ProNum 个产品. } std::cout << "Producer thread " << std::this_thread::get_id() << " is exiting..." << std::endl; } void ConsumerTask() // 消费者任务 { bool ready_to_exit = false; while (1) { sleep(1); std::unique_lock<std::mutex> lock(instance.item_counter_mtx); if (instance.item_counter < ProNum) { int item = Consumer(&instance); ++(instance.item_counter); std::cout << "Consumer thread " << std::this_thread::get_id() << " is consuming the " << item << "^th item" << std::endl; } else ready_to_exit = true; if (ready_to_exit == true) break; } std::cout << "Consumer thread " << std::this_thread::get_id() << " is exiting..." << std::endl; } void Initresource(resource *ir) { ir->write_pos = 0; // 初始化产品写入位置. ir->read_pos = 0; // 初始化产品读取位置. ir->item_counter = 0; } int main() { Initresource(&instance); std::thread producer(ProducerTask); std::thread consumer1(ConsumerTask); std::thread consumer2(ConsumerTask); std::thread consumer3(ConsumerTask); std::thread consumer4(ConsumerTask); producer.join(); consumer1.join(); consumer2.join(); consumer3.join(); consumer4.join(); }
4、多生产者-单消费者模型
与单生产者和单消费者模型不同的是,多生产者-单消费者模型中可以允许多个生产者同时向产品库中放入产品。所以除了保护产品库在多个读写线程下互斥之外,还需要维护生产者放入产品的计数器,代码如下:
#include <unistd.h> #include <cstdlib> #include <condition_variable> #include <iostream> #include <mutex> #include <thread> static const int bufSize = 8; // Item buffer size. static const int ProNum = 20; // How many items we plan to produce. struct resource { int buf[bufSize]; // 产品缓冲区, 配合 read_pos 和 write_pos 模型环形队列. size_t read_pos; // 消费者读取产品位置. size_t write_pos; // 生产者写入产品位置. size_t item_counter; std::mutex mtx; // 互斥量,保护产品缓冲区 std::mutex item_counter_mtx; std::condition_variable not_full; // 条件变量, 指示产品缓冲区不为满. std::condition_variable not_empty; // 条件变量, 指示产品缓冲区不为空. } instance; // 产品库全局变量, 生产者和消费者操作该变量. typedef struct resource resource; void Producer(resource *ir, int item) { std::unique_lock<std::mutex> lock(ir->mtx); while (((ir->write_pos + 1) % bufSize) == ir->read_pos) { // item buffer is full, just wait here. std::cout << "Producer is waiting for an empty slot...\n"; (ir->not_full).wait(lock); // 生产者等待"产品库缓冲区不为满"这一条件发生. } (ir->buf)[ir->write_pos] = item; // 写入产品. (ir->write_pos)++; // 写入位置后移. if (ir->write_pos == bufSize) // 写入位置若是在队列最后则重新设置为初始位置. ir->write_pos = 0; (ir->not_empty).notify_all(); // 通知消费者产品库不为空. } int Consumer(resource *ir) { int data; std::unique_lock<std::mutex> lock(ir->mtx); // item buffer is empty, just wait here. while (ir->write_pos == ir->read_pos) { std::cout << "Consumer is waiting for items...\n"; (ir->not_empty).wait(lock); // 消费者等待"产品库缓冲区不为空"这一条件发生. } data = (ir->buf)[ir->read_pos]; // 读取某一产品 (ir->read_pos)++; // 读取位置后移 if (ir->read_pos >= bufSize) // 读取位置若移到最后,则重新置位. ir->read_pos = 0; (ir->not_full).notify_all(); // 通知消费者产品库不为满. return data; // 返回产品. } void ProducerTask() // 生产者任务 { bool ready_to_exit = false; while (1) { sleep(1); std::unique_lock<std::mutex> lock(instance.item_counter_mtx); if (instance.item_counter < ProNum) { ++(instance.item_counter); Producer(&instance, instance.item_counter); std::cout << "Producer thread " << std::this_thread::get_id() << " is producing the " << instance.item_counter << "^th item" << std::endl; } else ready_to_exit = true; if (ready_to_exit == true) break; } std::cout << "Producer thread " << std::this_thread::get_id() << " is exiting..." << std::endl; } void ConsumerTask() // 消费者任务 { static int cnt = 0; while (1) { sleep(1); cnt++; if (cnt <= ProNum) { int item = Consumer(&instance); // 消费一个产品. std::cout << "Consumer thread " << std::this_thread::get_id() << " is consuming the " << item << "^th item" << std::endl; } else break; // 如果产品消费个数为 ProNum, 则退出. } std::cout << "Consumer thread " << std::this_thread::get_id() << " is exiting..." << std::endl; } void Initresource(resource *ir) { ir->write_pos = 0; // 初始化产品写入位置. ir->read_pos = 0; // 初始化产品读取位置. ir->item_counter = 0; } int main() { Initresource(&instance); std::thread producer1(ProducerTask); std::thread producer2(ProducerTask); std::thread producer3(ProducerTask); std::thread producer4(ProducerTask); std::thread consumer(ConsumerTask); producer1.join(); producer2.join(); producer3.join(); producer4.join(); consumer.join(); }
5、多生产者-多消费者模型
该模型可以说是前面两种模型的综合,程序需要维护两个计数器,分别是生产者已生产产品的数目和消费者已取走产品的数目。另外也需要保护产品库在多个生产者和多个消费者互斥地访问。
#include <unistd.h> #include <cstdlib> #include <condition_variable> #include <iostream> #include <mutex> #include <thread> static const int bufSize = 8; // Item buffer size. static const int ProNum = 20; // How many items we plan to produce. struct resource { int buf[bufSize]; // 产品缓冲区, 配合 read_pos 和 write_pos 模型环形队列. size_t read_pos; // 消费者读取产品位置. size_t write_pos; // 生产者写入产品位置. size_t pro_item_counter; size_t con_item_counter; std::mutex mtx; // 互斥量,保护产品缓冲区 std::mutex pro_mtx; std::mutex con_mtx; std::condition_variable not_full; // 条件变量, 指示产品缓冲区不为满. std::condition_variable not_empty; // 条件变量, 指示产品缓冲区不为空. } instance; // 产品库全局变量, 生产者和消费者操作该变量. typedef struct resource resource; void Producer(resource *ir, int item) { std::unique_lock<std::mutex> lock(ir->mtx); while (((ir->write_pos + 1) % bufSize) == ir->read_pos) { // item buffer is full, just wait here. std::cout << "Producer is waiting for an empty slot...\n"; (ir->not_full).wait(lock); // 生产者等待"产品库缓冲区不为满"这一条件发生. } (ir->buf)[ir->write_pos] = item; // 写入产品. (ir->write_pos)++; // 写入位置后移. if (ir->write_pos == bufSize) // 写入位置若是在队列最后则重新设置为初始位置. ir->write_pos = 0; (ir->not_empty).notify_all(); // 通知消费者产品库不为空. } int Consumer(resource *ir) { int data; std::unique_lock<std::mutex> lock(ir->mtx); // item buffer is empty, just wait here. while (ir->write_pos == ir->read_pos) { std::cout << "Consumer is waiting for items...\n"; (ir->not_empty).wait(lock); // 消费者等待"产品库缓冲区不为空"这一条件发生. } data = (ir->buf)[ir->read_pos]; // 读取某一产品 (ir->read_pos)++; // 读取位置后移 if (ir->read_pos >= bufSize) // 读取位置若移到最后,则重新置位. ir->read_pos = 0; (ir->not_full).notify_all(); // 通知消费者产品库不为满. return data; // 返回产品. } void ProducerTask() // 生产者任务 { bool ready_to_exit = false; while (1) { sleep(1); std::unique_lock<std::mutex> lock(instance.pro_mtx); if (instance.pro_item_counter < ProNum) { ++(instance.pro_item_counter); Producer(&instance, instance.pro_item_counter); std::cout << "Producer thread " << std::this_thread::get_id() << " is producing the " << instance.pro_item_counter << "^th item" << std::endl; } else ready_to_exit = true; lock.unlock(); if (ready_to_exit == true) break; } std::cout << "Producer thread " << std::this_thread::get_id() << " is exiting..." << std::endl; } void ConsumerTask() // 消费者任务 { bool ready_to_exit = false; while (1) { sleep(1); std::unique_lock<std::mutex> lock(instance.con_mtx); if (instance.con_item_counter < ProNum) { int item = Consumer(&instance); ++(instance.con_item_counter); std::cout << "Consumer thread " << std::this_thread::get_id() << " is consuming the " << item << "^th item" << std::endl; } else ready_to_exit = true; lock.unlock(); if (ready_to_exit == true) break; } std::cout << "Consumer thread " << std::this_thread::get_id() << " is exiting..." << std::endl; } void Initresource(resource *ir) { ir->write_pos = 0; // 初始化产品写入位置. ir->read_pos = 0; // 初始化产品读取位置. ir->pro_item_counter = 0; ir->con_item_counter = 0; } int main() { Initresource(&instance); std::thread producer1(ProducerTask); std::thread producer2(ProducerTask); std::thread producer3(ProducerTask); std::thread producer4(ProducerTask); std::thread consumer1(ConsumerTask); std::thread consumer2(ConsumerTask); std::thread consumer3(ConsumerTask); std::thread consumer4(ConsumerTask); producer1.join(); producer2.join(); producer3.join(); producer4.join(); consumer1.join(); consumer2.join(); consumer3.join(); consumer4.join(); return 0; }
参考
1、https://www.cnblogs.com/haippy/p/3252092.html
2、https://blog.csdn.net/qq_41681241/article/details/86708303
3、https://blog.csdn.net/h_wulingfei/article/details/104897449 -
C++实现生产者消费者
2021-10-22 15:09:19利用C++内置函数实现生产者-消费者功能: private: std::deque<T> queue_; //缓存队列 size_t size_limit_; //缓存队列大小的限制 std::mutex lock_; //互斥锁 std::condition_variable empty_, full_; //... -
进程同步之生产者-消费者问题(C++)
2019-08-15 09:43:11有一群生产者进程在生产产品,并将这些产品提供给消费者进程进行消费,生产者进程和消费者进程可以并发执行,在两者之间设置了一个具有n个缓冲区的缓冲池,生产者进程需要将所生产的产品放到缓冲区中(+1操作),... -
C++ 多线程通信方式简介并结合生产者-消费者模式代码实现
2018-10-15 14:36:37C++ 多线程通信方式简介并结合生产者-消费者模式代码实现 -
多线程实现生产者-消费者问题——C++版本
2022-04-03 13:33:531)生产者进程每次生产1个产品(产品数量大于4时停止生产); 2)消费者进程每次消耗2个产品; #include <iostream> #include <thread> #include <mutex> #include <condition_variable> ... -
生产者 消费者 模式 c++
2014-04-25 07:25:13生产者 消费者 模式 c++ 算是老外写的一个使用demo 可以参考一下 -
生产者消费者问题.c
2019-11-26 08:08:44操作系统课程生产者消费者问题模拟程序,程序相对简单,通过这个模拟程序能够帮助学习者会更好的学习os,供有需要的人学习使用。 -
生产消费者模式的C++实现
2014-11-27 10:12:18利用C++实现的生产消费者模式,每个生产者和消费者都在不同的线程中异步执行。 -
《操作系统实验》C++实现生产者-消费者问题
2021-01-10 11:16:42生产者-消费者问题1 实验内容及要求2 实验环境3 实验设计3.1 问题描述3.2 基本思想3.2.1 生产者线程3.2.2 消费者线程3.2.3 同步的实现3.3 数据结构4 实验源码5 实验总结 1 实验内容及要求 1、模拟生产者—消费者问题... -
生产者-消费者C++实现(一)
2019-06-03 15:21:48和同学闲聊,谈到多线程中的经典问题——生产者-消费者问题:要求实现两个线程,一个线程负责对全局变量进行+1操作,一个线程负责打印更新后的值。自己从事code多年,自以为对多线程了解深入,不假思索,写出了下面... -
生产者与消费者问题(C++实现PV操作)
2009-10-20 21:23:56在很多网站上都没找到,所以自己做了个发出来,希望大家给出意见 -
C++多线程编程——线程同步:生产者与消费者问题
2018-05-24 21:00:46生产者与消费者问题是多线程中非常经典的线程同步问题,这些线程必须按照一定的生产率和消费率来访问共享缓冲区。实现方法是:设置两个信号量full和empty,其中full表示消费缓冲区的个数,empty表示生产缓冲区的个数... -
适用于C ++ 11的快速多生产者,多消费者,无锁定并发队列-C/C++开发
2021-05-26 20:05:25注意:如果您需要的只是一个单一生产者,单一消费者队列,那么我也可以选择其中之一。 特色快如闪电般的快速pe moodycamel :: ConcurrentQueue C ++的工业级无锁队列。 注意:如果您需要的只是一个单一生产者,单一... -
(Linux C)利用多进程或多线程模拟实现生产者/消费者问题
2012-12-30 19:36:37Linux C语言 实现利用多进程或多线程模拟实现生产者/消费者问题。 (站在巨人的肩膀上) -
多生产者多消费者问题C/C++实现
2020-08-19 10:56:22多生产者多消费者问题C/C++实现 /* 这里使用到几个常用函数: #include<windows.h> //信号量 HANDLE WINAPI CreateSemaphore( _In_opt_ LPSECURITY_ATTRIBUTES lpSemaphoreAttributes //安全属性,如果为... -
生产者跟消费者问题(C++实现)
2018-06-26 00:26:49T 表示有T个生产者。同时也有T个消费者。 然后每个生产者总共生产T个数据。 每个消费者总共消费T个数据。 N表示最大随机数大数值 那个信号量的实现,我是直接copy课件上的。 代码 #include &lt;... -
生产者消费者问题
2016-12-22 16:49:01创建5个进程,其中两个进程为生产者进程,3个进程为消费者进程。一个生产者进程试图不断地在一个缓冲中写入大写字母,另一个生产者进程试图不断地在缓冲中写入小写字母。3个消费者不断地从缓冲中读取一个字符并输出...