生产者消费者问题 订阅
生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。 展开全文
生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
信息
外文名
Producer-consumer problem
别    称
有限缓冲问题
中文名
生产者消费者问题
产生对象
生产者”和“消费者
特征点分类
解决办法要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。通常采用进程间通信的方法解决该问题,常用的方法有信号灯法等。如果解决方法不够完善,则容易出现死锁的情况。出现死锁时,两个线程都会陷入休眠,等待对方唤醒自己。该问题也能被推广到多个生产者和消费者的情形。
收起全文
精华内容
下载资源
问答
  • 生产者消费者问题

    千次阅读 2017-05-28 11:03:04
    2、理解不同进程之间的通信机制,掌握生产者消费者问题。 二、实验内容  生产者消费者问题(需要Windows版本和Linux版本) • 一个大小为3的缓冲区,初始为空 • 2个生产者 – 随机等待一段时间,往缓冲区添加...

    一、实验目的

    1、对于不同系统的平台学会使用调用系统函数,掌握进程创建的方法和步骤。

    2、理解不同进程之间的通信机制,掌握生产者消费者问题。

    二、实验内容

           生产者消费者问题(需要Windows版本和Linux版本)

    •   一个大小为3的缓冲区,初始为空

    •   2个生产者

    – 随机等待一段时间,往缓冲区添加数据

    – 若缓冲区已满,等待消费者取走数据后再添加

    – 重复6次

    •   3个消费者

    – 随机等待一段时间,从缓冲区读取数据

    – 若缓冲区为空,等待生产者添加数据后再读取

    – 重复4次

    说明:

    1、显示每次添加和读取数据的时间及缓冲区的状态

    2、生产者和消费者用进程模拟。

    三、实验环境

    1、Windows环境下使用Dev-c++和Notepad++;

    2、Linux环境是在虚拟机中装Ubuntu15.10;如图1所示

    图1

    3、在Linux中使用Linux自带的文本编辑器(gedit)进行编辑,在终端进行编译和运行。

    四、实验方法与步骤

    1、在Windows下实现

    (1)、首先理解在理论中学习的生产者消费者问题,然后根据本实验内容的要求,将已知条件转化为编译器能识别的宏或者变量,在此我定义为宏;(如图2所示)

    图2

    (2)、定义缓冲区的结构,此缓冲区用共享内存实现的。(如图3所示)

    图3

    (3)、生产者与消费者需要用进程模拟,就需要考虑进程之间的通信;同时生产者往缓冲区写入数据、消费者从缓冲区读取数据,不同进程之间同步进行,牵涉到进程通信和共享内存,解决这个问题的方案比较多,此处我才用的是文件映射。(如图4所示) 


    图4

    (4)、编写通用函数:本实验中的随机等待时间函数get_random()、生产者往缓冲区写入数据的函数get_letter()、通过参数传递进程的ID创建当前进程的克隆进程函数StartClone()、创建共享内存并返回创建后的文件句柄的函数MakeSharedFile();

    (5)、MakeShareFile()函数思路:首先利用CreateFileMapping()函数创建临时的文件映射对象;如果文件映射成功,利用MapViewOfFile()函数把文件映射的一个视图映射到当前进程的地址空间、并返回文件映射的起始地址;若起始地址有效,将前一步的地址所指向的存储空间清0;在当前进程的地址空间中解除对一个文件映射对象的映射;最终返回临时的文件映射对象句柄。(如图5所示)

    图5

    (6)、编写主函数

    对于主进程:

    ①、利用上述所写的MakeSharedFile()函数创建数据文件;

    ②、利用OpenFileMapping()函数打开文件映射,利用MapViewOfFile()函数把打开的文件映射到主进程的地址空间;

    ③、如果第②步映射成功,对主进程进行处理,将缓冲区的头尾指针都指向0,然后创建信号量,利用UnmapViewOfFile()函数在当前进程的地址空间中解除对缓冲区文件的映射;

    ④、如果第②步映射失败,输出提示信息;

    ⑤、利用CloseHandle()函数关闭文件映射对象句柄;

    ⑥、利用StartClone()函数创建2个生产者进程和3个消费者进程;

    ⑦、利用WaitForSingleObject()函数和CloseHandle()函数等待5个子进程运行结束,并关闭各个子进程的句柄。

    对于2个生产者进程:

    ①、利用OpenFileMapping()函数打开文件映射,利用MapViewOfFile()函数把打开的文件映射到此进程的地址空间;

    ②、如果第①步映射成功,对此进程进行处理,利用OpenSemaphore()函数打开信号量、并将打开的信号量赋值给共享内存中的信号量;每个生产者各工作6次,利用WaitForSingleObject()函数等待一个子进程结束并睡眠一个随机的时间,利用get_letter()函数随机产生一个字母放入环形缓冲区中,并将当前位置记录下来,此处从0开始;取当前的系统时间,并将每一次写入后的环形缓冲区状态输出,并显示进程的操作;利用UnmapViewOfFile()函数将当前进程句柄关闭;

    ③、如果第①步映射失败,输出提示信息;

    ④、利用CloseHandle()函数将第①步打开的文件映射对象句柄关闭。

    对于3个消费者进程:

    ①、利用OpenFileMapping()函数打开文件映射,利用MapViewOfFile()函数把打开的文件映射到此进程的地址空间;

    ②、如果第①步映射成功,对此进程进行处理,利用OpenSemaphore()函数打开信号量、并将打开的信号量赋值给共享内存中的信号量;每个消费者各工作4次,利用WaitForSingleObject()函数等待一个子进程结束并睡眠一个随机的时间,取出环形缓冲区中的数据,若缓冲区为空(头尾指针指向同一个位置),并将当前位置记录下来,并赋值给缓冲区结构体中的is_empty成员,取当前的系统时间,并将每一次读取后的环形缓冲区状态输出,并显示进程的操作;利用ReleaseSemaphore()函数释放此进程的信号量,并利用UnmapViewOfFile()函数将当前进程句柄关闭;

    ③、如果第①步映射失败,输出提示信息;

    ④、利用CloseHandle()函数将第①步打开的文件映射对象句柄关闭。

    (7)、各个子进程运行结束后,关闭主进程的句柄。

    2、在Linux下实现

    (1)、首先理解在理论中学习的生产者消费者问题,然后根据本实验内容的要求,将已知条件转化为编译器能识别的宏或者变量,在此我定义为宏;如下所示:

    //2个生产者,每个生产者工作6次

    #defineNeed_Producer 2

    #defineWorks_Producer 6

     

    //3个消费者,每个消费者工作4次

    #defineNeed_Customer 3

    #defineWorks_Customer 4

     

    //缓冲区为3

    #definebuffer_len 3

    #defineMYBUF_LEN (sizeof(struct mybuffer))

     

    #define SHM_MODE0600

    #defineSEM_ALL_KEY 1234

    #defineSEM_EMPTY 0

    #define SEM_FULL1

    (2)、定义缓冲区的结构(如图6所示)

    图6

    (3)、编写通用函数:本实验中的随机等待时间函数get_random()、生产者往缓冲区写入数据的函数get_letter()、生产者与消费者的PV操作(如图7所示)

    图7

    (4)、编写主函数:

    对于主函数:

    ①、创建一个信号量集合,若返回的信号量集合的标识号不小于0,输出提示;

    ②、对信号量进行控制操作;

    ③、申请一个共享内存区,成功返回共享内存区的标识,若此标识小于0,则申请共享内存区失败并输出相应提示;

    ④、将共享段附加到申请通信的进程空间;成功时返回共享内存附加到进程空间的虚地址,失败时返回-1,若返回-1则输出相应提示;

    ⑤、初始化环形缓冲区中的数据成员。

    对于2个生产者进程:

    ①、创建新的进程,若所创建的进程标识符小于0,则创建进程失败,并输出相应提示;

    ②、若此进程为子进程,将共享段附加到申请通信的进程空间;成功时返回共享内存附加到进程空间的虚地址,失败时返回-1,并输出相应提示;

    ③、2个生产者进程各执行6次,利用上面的P操作,然后睡眠一段随机时间;利用get_letter()函数得到一个随机的字母并写入环形缓冲区,将is_empty设置为0;取当前的系统时间,对每一次写入后、输出当前缓冲区的状态,并显示进程的操作;将第①步创建的信号量执行V操作;

    ④、将共享段与进程之间解除连接。

    对于3个消费者进程:

    ①、创建新的进程,若所创建的进程标识符小于0,则创建进程失败,并输出相应提示;

    ②、若此进程为子进程,将共享段附加到申请通信的进程空间;成功时返回共享内存附加到进程空间的虚地址,失败时返回-1,并输出相应提示;

    ③、3个消费者进程各执行4次,利用上面的P操作,然后睡眠一段随机时间;读取环形缓冲区中的数据,如头尾指针指向同一单元,则将当前单元索引赋值给is_empty;取当前的系统时间,对每一次读取后、输出当前缓冲区的状态,并显示进程的操作;将第①步创建的信号量执行V操作;

    ④、将共享段与进程之间解除连接。

    (5)、主进程等待所有子进程运行结束,将共享段与进程之间解除连接。

    五、实验结果

    1、Windows下的实验截图(如图8所示)


    图8 

    2、Linux下的实验截图(如图9所示)



    图9 

    六、实验分析与总结

    Windows内存管理器使用局部对象来实现共享内存。文件中的字节一对一映射到虚地址空间。读内存的时候实际上是从文件获取数据,修改内存最终将写回到文件。进程间通信、共享数据有很多种方法,文件映射是常用的一种方法。因为mapping对象在系统中是全局的,一个进程创建的Mapping对象可以从另外一个进程打开,映射视图就是进程间共享的内存了。一般在共享的内存数据量比较大时,选择使用文件映射进行共享。使用CreateFileMapping()创建文件映射,OpenFileMapping()打开文件映射。调用 MapViewOfFile()将文件映射到进程的地址空间。

    共享主存段为进程提供了直接通过主存进行通信的有效手段。使用shmget()系统调用实现共享主存段的创建,shmget()返回共享内存区的 ID。对于已经申请到的共享段,进程需把它附加到自己的虚拟空间中才能对其进行读写。使用shmat()将共享内存附加到进程的地址空间。程序退出时调用 shmdt()将共享存储区从本地进程中解除连接,但它不删除共享存储区。shmctl()调用可实现多种共享存储区操作,包括删除和获取信息。在UNIX系统中,一个或多个信号量构成一个信号量集合。使用信号量机制可以实现进程之间的同步和互斥,允许并发进程一次对一组信号量进行相同或不同的操作。每个P、V操作不限于减1或加1,而是可以加减任何整数。在进程终止时,系统可根据需要自动消除所有被进程操作过的信号量的影响。 semget()调用建立一个信号量集合,semctl()调用对信号量执行控制操作,进而实现P、V操作。

    七、实验源代码

    Windows版本
    // 名称:ProducerAndCustomer.h
    // 描述:生产者消费者问题
    // 作者:野狼
    // 日期:2017.3.27
    
    #ifndef __PRODUCERANDCUSTOMER_H
    #define __PRODUCERANDCUSTOMER_H
    #include <stdio.h>
    #include <stdlib.h>
    #include <time.h>
    #include <windows.h>
    #include <unistd.h>
    
    //1个主进程序号记为0
    #define mainNum 0
    //2个生产者序号(1~2)
    #define Producer_Num_from 1
    #define Producer_Num_to 2
    //3个消费者序号(3~5)
    #define Customer_Num_from 3
    #define Customer_Num_to 5
    
    //2个生产者,每个生产者工作6次
    #define Need_Producer 2
    #define Works_Producer 6
    
    //3个消费者,每个消费者工作4次
    #define Need_Customer 3
    #define Works_Customer 4
    
    //缓冲区为3
    #define buffer_len 3
    
    #define SHM_NAME "BUFFER"
    
    //文件映射对象句柄
    static HANDLE handleFileMapping;
    
    //子进程句柄数组
    static HANDLE subProHandleArray[5+1];
    
    //缓冲区的结构
    struct mybuffer
    {
    	char str[buffer_len];
    	int head;
    	int tail;
    	int is_empty;
    };
    
    //共享主存区的结构
    struct shareMemory
    {
    	struct mybuffer data;
    	HANDLE semEmpty;
    	HANDLE semFull;
    };
    
    //得到1000以内的一个随机数
    int get_random()
    {
    	int digit;
    	srand((unsigned)(GetCurrentProcessId() + time(NULL)));
    	digit = rand() % 1000;
    	return digit;
    }
    
    //得到A~Z的一个随机字母
    char get_letter()
    {
    	char letter;
    	srand((unsigned)(getpid() + time(NULL)));
    	letter = (char)((rand() % 26) + 'A');
    	return letter;
    }
    
    //通过参数传递进程的ID创建当前进程的克隆进程
    void StartClone(int subProID)
    {
    	char szFilename[MAX_PATH];
    	char szCmdLine[MAX_PATH];
    	STARTUPINFO si;
    	PROCESS_INFORMATION pi;
    	//获得当前可执行文件名,hModule为NULL返回当前可执行文件的路径名;存放给定模块的路径和文件名;缓冲区大小
    	GetModuleFileName(NULL, szFilename, MAX_PATH);
    	sprintf(szCmdLine, "\"%s\" %d",szFilename, subProID);
    	
    	memset(&si, 0, sizeof(STARTUPINFO));
    	si.cb = sizeof(STARTUPINFO);
    	
    	//创建子进程
    	BOOL bCreateOK = CreateProcess(szFilename, szCmdLine, NULL, NULL, FALSE, 0, NULL, NULL, &si, &pi);
    	//将新创建进程的句柄赋值给进程ID为subProID的子进程
    	subProHandleArray[subProID] = pi.hProcess;	
    	return;
    }
    
    //创建共享内存
    HANDLE MakeSharedFile()
    {
    	//创建临时的文件映射对象(用INVALID_HANDLE_VALUE代替真正的文件句柄)
    	HANDLE handleFileMapping = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE, 0, sizeof(struct shareMemory), SHM_NAME);
        if (handleFileMapping == NULL || handleFileMapping == INVALID_HANDLE_VALUE)
    	{
    		printf("创建文件映射失败:%d\n",GetLastError());
    		return;
    	}
    	if (handleFileMapping != INVALID_HANDLE_VALUE)
    	{
    		//把文件映射对象的一个视图映射到当前进程的地址空间,返回值为文件映射的起始地址
    		LPVOID pData = MapViewOfFile(handleFileMapping, FILE_MAP_ALL_ACCESS, 0, 0, 0);//高32位,低32位,整个文件映射
    		if (pData == NULL)
    		{
    			printf("创建文件映射视图失败:%d\n",GetLastError());
    			return;
    		}
    		if (pData != NULL)
    		{
    			//将指定的存储空间清0
    			ZeroMemory(pData, sizeof(struct shareMemory));
    		}
    		//在当前进程的地址空间中解除对一个文件映射对象的映射
    		UnmapViewOfFile(pData);
    	}
    	return handleFileMapping;
    }
    
    #endif
    // 名称:ProducerAndCustomer.c
    // 描述:生产者消费者问题
    // 作者:野狼
    // 日期:2017.3.27
    
    #include "ProducerAndCustomer.h"
    
    int main(int argc, char * argv[])
    {
    	int i, j, k;
    	int nextIndex = 1;	//下一个要执行的进程序号 
    	int curProNum = mainNum;
    	char lt;
    	SYSTEMTIME time;
    	
    	//printf("缓冲区大小为:%d.\n",buffer_len);
    	//printf("%d个生产者,分别写入%d次.\n",Need_Producer, Works_Producer);
    	//printf("%d个消费者,分别读取%d次.\n",Need_Customer, Works_Customer);
    	
    	//如果有参数,就作为子进程ID
    	if (argc > 1)
    	{
    		sscanf(argv[1], "%d", &curProNum);
    	}
    	
    	//对于主进程
    	if (curProNum == mainNum)
    	{
    		printf("主进程开始运行!\n");
    		
    		//创建共享内存 
    		handleFileMapping = MakeSharedFile();
    		//映射视图
    		HANDLE hFileMapping = OpenFileMapping(FILE_MAP_ALL_ACCESS, FALSE, SHM_NAME);
    		if (hFileMapping == NULL)
    		{
    			printf("打开文件映射失败:%d\n",GetLastError());
    			return;
    		}
    		LPVOID pFile = MapViewOfFile(hFileMapping, FILE_MAP_ALL_ACCESS, 0, 0, 0);
    		if (pFile == NULL)
    		{
    			printf("文件映射视图失败:%d\n",GetLastError());
    			return;
    		}
    		else
    		{
    			struct shareMemory *sham = (struct shareMemory*)(pFile);
    			sham->data.head = 0;
    			sham->data.tail = 0;
    			sham->semEmpty = CreateSemaphore(NULL, buffer_len, buffer_len, "SEM_EMPTY");
    			sham->semFull = CreateSemaphore(NULL, 0, buffer_len, "SEM_FULL");
    			//取消文件映射
    			UnmapViewOfFile(pFile);
    			pFile = NULL;
    		}
    		CloseHandle(hFileMapping);
    		
    		//创建5个子进程
    		while (nextIndex <= 5)
    		{
    			StartClone(nextIndex++);
    		}
    		
    		//等待子进程运行结束
    		for (k=1; k<6; k++)
    		{
    			WaitForSingleObject(subProHandleArray[k], INFINITE);
    			CloseHandle(subProHandleArray[k]);
    		}
    		
    		//输出结束信息
    		printf("主进程运行结束!\n");
    	}
    	
    	//2个生产者进程
    	else if (curProNum >= Producer_Num_from && curProNum <= Producer_Num_to)
    	{
    		//映射视图
    		HANDLE hFileMapping = OpenFileMapping(FILE_MAP_ALL_ACCESS, FALSE, SHM_NAME);
    		if (hFileMapping == NULL)
    		
    		{
    			printf("打开文件映射失败:%d\n",GetLastError());
    			return;
    		}
    		LPVOID pFile = MapViewOfFile(hFileMapping, FILE_MAP_ALL_ACCESS, 0, 0, 0);
    		if (pFile == NULL)
    		{
    			printf("文件映射视图失败:%d\n",GetLastError());
    			return;
    		}
    		else
    		{
    			struct shareMemory *sham = (struct shareMemory*)(pFile);
    			sham->semEmpty = OpenSemaphore(SEMAPHORE_ALL_ACCESS,FALSE,"SEM_EMPTY");
    			sham->semFull = OpenSemaphore(SEMAPHORE_ALL_ACCESS,FALSE,"SEM_FULL");
    			
    			for (i=0; i<Works_Producer; i++)
    			{
    				WaitForSingleObject(sham->semEmpty, INFINITE);
    				Sleep(get_random());
    				sham->data.str[sham->data.tail] = lt = get_letter();
    				sham->data.tail = (sham->data.tail + 1) % buffer_len;
    				sham->data.is_empty = 0;
    
    				GetSystemTime(&time);
    				printf("%04d:%02d:%02d-%02d:%02d:%02d\t",time.wYear,time.wMonth,time.wDay,time.wHour+8,time.wMinute,time.wSecond);
    				
    				j = (sham->data.tail-1 >= sham->data.head) ? (sham->data.tail - 1) : (sham->data.tail -1 + buffer_len);
    				
    				for (j; !(sham->data.is_empty)&&(j>=sham->data.head); j--)
    				{
    					printf("%c", sham->data.str[j % buffer_len]);
    				}
    				printf("\t 生产者%d进程 写入 '%c'.\n",curProNum-mainNum, lt);
    				ReleaseSemaphore(sham->semFull, 1, NULL);
    			}
    			UnmapViewOfFile(pFile);
    			pFile = NULL;
    		}
    		CloseHandle(hFileMapping);
    	}
    	
    	//3个消费者进程
    	else if (curProNum >= Customer_Num_from && curProNum <= Customer_Num_to)
    	{
    		HANDLE hFileMapping = OpenFileMapping(FILE_MAP_ALL_ACCESS, FALSE, SHM_NAME);
    		if (hFileMapping == NULL)
    		{
    			printf("打开文件映射失败:%d\n",GetLastError());
    			return;
    		}
    		LPVOID pFile = MapViewOfFile(hFileMapping, FILE_MAP_ALL_ACCESS, 0, 0, 0);
    		if (pFile == NULL)
    		{
    			printf("文件映射视图失败:%d\n",GetLastError());
    			return;
    		}
    		else
    		{
    			struct shareMemory *sham = (struct shareMemory*)(pFile);
    			sham->semEmpty = OpenSemaphore(SEMAPHORE_ALL_ACCESS,FALSE,"SEM_EMPTY");
    			sham->semFull = OpenSemaphore(SEMAPHORE_ALL_ACCESS,FALSE,"SEM_FULL");
    			
    			for (i=0; i<Works_Customer; i++)
    			{
    				WaitForSingleObject(sham->semFull, INFINITE);
    				Sleep(get_random());
    				lt = sham->data.str[sham->data.head];
    				sham->data.head = (sham->data.head + 1) % buffer_len;
    				sham->data.is_empty = (sham->data.head == sham->data.tail);
    				GetSystemTime(&time);
    				printf("%04d:%02d:%02d-%02d:%02d:%02d\t",time.wYear,time.wMonth,time.wDay,time.wHour+8,time.wMinute,time.wSecond);
    				
    				j = (sham->data.tail-1 >= sham->data.head) ? (sham->data.tail - 1) : (sham->data.tail -1 + buffer_len);
    				
    				for (j; !(sham->data.is_empty)&&(j>=sham->data.head); j--)
    				{
    					printf("%c", sham->data.str[j % buffer_len]);
    				}
    				printf("\t 消费者%d进程 读取 '%c'. \n",curProNum-Producer_Num_to, lt);
    				ReleaseSemaphore(sham->semEmpty, 1, NULL);
    			}
    			UnmapViewOfFile(pFile);
    			pFile = NULL;
    		}
    		CloseHandle(hFileMapping);
    	}
    	
    	//关闭主进程的句柄
    	CloseHandle(handleFileMapping);
    	handleFileMapping = INVALID_HANDLE_VALUE;
    	return 0;
    }
    

    
    
    Linux版本
    /********************************************/
    /*名称:ProducerAndCustomer.h
    /*描述:生产者与消费者的子函数定义和声明
    /*作者:野狼
    /*日期:2017-03-26
    /********************************************/
    
    #ifndef __PRODUCERANDCUSTOMER_H
    #define __PRODUCERANDCUSTOMER_H
    #include <stdio.h>
    #include <unistd.h>
    #include <time.h>
    #include <sys/ipc.h>
    #include <sys/shm.h>
    #include <sys/sem.h>
    
    //2个生产者,每个生产者工作6次
    #define Need_Producer 2
    #define Works_Producer 6
    
    //3个消费者,每个消费者工作4次
    #define Need_Customer 3
    #define Works_Customer 4
    
    //缓冲区为3
    #define buffer_len 3
    
    #define MYBUF_LEN (sizeof(struct mybuffer))
    
    #define SHM_MODE 0600//可读可写
    #define SEM_ALL_KEY 1234
    #define SEM_EMPTY 0
    #define SEM_FULL 1
    
    //缓冲区的结构
    struct mybuffer
    {
    	char str[buffer_len];
    	int head;
    	int tail;
    	int is_empty;
    };
    
    //得到10以内的一个随机数
    int get_random()
    {
    	int digit;
    	srand((unsigned)(getpid() + time(NULL)));
    	digit = rand() % 10;
    	return digit;
    }
    
    //得到A~Z的一个随机字母
    char get_letter()
    {
    	char letter;
    	srand((unsigned)(getpid() + time(NULL)));
    	letter = (char)((rand() % 26) + 'A');
    	return letter;
    }
    
    //P操作
    void P(int sem_id, int sem_num)
    {
    	struct sembuf xx;
    	xx.sem_num = sem_num;//信号量的索引
    	xx.sem_op = -1;//信号量的操作值
    	xx.sem_flg = 0;//访问标志
     	semop(sem_id,&xx,1);//一次需进行的操作的数组sembuf中的元素数为1
    }
    
    //V操作
    void V(int sem_id, int sem_num)
    {
    	struct sembuf xx;
    	xx.sem_num = sem_num;
    	xx.sem_op = 1;
    	xx.sem_flg = 0;
    	semop(sem_id,&xx,1);
    }
    
    #endif
    
    /********************************************/
    /*名称:ProducerAndCustomer.c
    /*描述:生产者与消费者的主函数
    /*作者:野狼
    /*日期:2017-03-28
    /********************************************/
    
    #include "ProducerAndCustomer.h"
    
    int main(int argc, char *argv[])
    {
    	int i, j;
    	int shm_id, sem_id;
    	int num_Producer = 0, num_Customer = 0;
    	struct mybuffer *shmptr;
    	char lt;
    
    	time_t now;
    	struct tm *timenow;
    
    	pid_t pid_p, pid_c;
    
    	//创建一个信号量集合(信号量数为2),返回值为信号量集合的标识号(关键字,信号量数,创建或打开的标志)
    	sem_id = semget(SEM_ALL_KEY,2,IPC_CREAT|0660);
    	if (sem_id >= 0)
    	{
    		printf("主进程开始运行! \n");
    	}
    	//对信号量执行控制操作(信号量集合标识,信号量的索引,要执行的操作命令,设置或返回信号量的参数)
    	semctl(sem_id, SEM_EMPTY, SETVAL, buffer_len);
    	semctl(sem_id, SEM_FULL, SETVAL, 0);
    
    	//申请一个共享内存区,成功返回为共享内存区的标识
    	shm_id = shmget(IPC_PRIVATE, MYBUF_LEN, SHM_MODE);
    	if (shm_id < 0)
    	{
    		printf("申请共享内存区失败!\n");
    		exit(1);
    	}
    
    	//将共享段附加到申请通信的进程空间;成功时返回共享内存附加到进程空间的虚地址,失败时返回-1
    	shmptr = shmat(shm_id, 0, 0);
    	if (shmptr == (void *)-1)
    	{
    		printf("将共享段附加到申请通信的进程空间失败!\n");
    		exit(1);
    	}
    
    	shmptr->head = 0;
    	shmptr->tail = 0;
    	shmptr->is_empty = 1;
    	
    	//2个生产者进程
    	while ((num_Producer++) < Need_Producer)
    	{
    		pid_p = fork();
    		if (pid_p < 0)
    		{
    			printf("创建进程失败!\n");
    			exit(1);
    		}
    		//如果是生产者子进程,开始创建生产者
    		if (pid_p == 0)
    		{
    			//将共享段附加到申请通信的进程空间;成功时返回共享内存附加到进程空间的虚地址,失败时返回-1
    			shmptr = shmat(shm_id, 0, 0);
    			if (shmptr == (void *)-1)
    			{
    				printf("将共享段附加到申请通信的进程空间失败!\n");
    				exit(1);
    			}
    		 	for (i=0; i<Works_Producer; i++)
    			{
    				P(sem_id, SEM_EMPTY);
    				sleep(get_random());
    				shmptr->str[shmptr->tail] = lt = get_letter();
    				shmptr->tail = (shmptr->tail + 1) % buffer_len;
    				shmptr->is_empty = 0;
    				time(&now);
    				timenow = localtime(&now);
    				now = time(NULL);
    				printf("%s ",asctime(timenow));
    				
    				j = (shmptr->tail-1 >= shmptr->head) ? (shmptr->tail-1) : (shmptr->tail-1+buffer_len);
    				for (j; !(shmptr->is_empty) && j >= shmptr->head; j--)
    				{
    					printf("%c", shmptr->str[j%buffer_len]);
    				}
    				printf("\t 生产者 %d  放入 '%c'. \n",num_Producer,lt);
    				fflush(stdout);
    				V(sem_id,SEM_FULL);
    			}
    			//将共享段与进程之间解除连接
    			shmdt(shmptr);
    			exit(0);
    		}
    	}
    	//3个消费者进程
    	while ((num_Customer++) < Need_Customer)
    	{
    		pid_c = fork();
    		if (pid_c < 0)
    		{
    			printf("创建进程失败!\n");
    			exit(1);
    		}
    		//如果是消费者子进程,开始创建消费者
    		if (pid_c == 0)
    		{
    			//将共享段附加到申请通信的进程空间;成功时返回共享内存附加到进程空间的虚地址,失败时返回-1
    			shmptr = shmat(shm_id, 0, 0);
    			if (shmptr == (void *)-1)
    			{
    				printf("将共享段附加到申请通信的进程空间失败!\n");
    				exit(1);
    			}
    		 	for (i=0; i<Works_Customer; i++)
    			{
    				P(sem_id, SEM_FULL);
    				sleep(get_random());
    				lt = shmptr->str[shmptr->head];
    				shmptr->head = (shmptr->head + 1) % buffer_len;
    				shmptr->is_empty = (shmptr->head == shmptr->tail);
    
    				time(&now);
    				timenow = localtime(&now);
    				now = time(NULL);
    				printf("%s ",asctime(timenow));
    
    				j = (shmptr->tail-1 >= shmptr->head) ? (shmptr->tail-1) : (shmptr->tail-1+buffer_len);
    				for (j; !(shmptr->is_empty) && j >= shmptr->head; j--)
    				{
    					printf("%c", shmptr->str[j%buffer_len]);
    				}
    				printf("\t 消费者 %d  取出 '%c'. \n",num_Customer,lt);
    				fflush(stdout);
    				V(sem_id,SEM_EMPTY);
    			}
    			//将共享段与进程之间解除连接
    			shmdt(shmptr);
    			exit(0);
    		}
    	}
    
    	//主进程最后退出
    	while (wait(0) != -1);
    	//将共享段与进程之间解除连接
    	shmdt(shmptr);
    	//对共享内存区执行控制操作
    	shmctl(shm_id,IPC_RMID,0);//当cmd为IPC_RMID时,删除该共享段
    	shmctl(sem_id,IPC_RMID,0);
    	printf("主进程运行结束!\n");
    	fflush(stdout);
    	exit(0);
    }
    
    

    展开全文
  • Java多种方式解决生产者消费者问题(十分详细)

    万次阅读 多人点赞 2018-08-16 08:40:50
    生产者消费者问题 一、问题描述 生产者消费者问题(Producer-consumer problem),也称有限缓冲问题(Bounded-buffer problem),是一个多线程同步问题的经典案例。生产者生成一定量的数据放到缓冲区中,...

    一、问题描述

    生产者消费者问题(Producer-consumer problem),也称有限缓冲问题(Bounded-buffer problem),是一个多线程同步问题的经典案例。生产者生成一定量的数据放到缓冲区中,然后重复此过程;与此同时,消费者也在缓冲区消耗这些数据。生产者和消费者之间必须保持同步,要保证生产者不会在缓冲区满时放入数据,消费者也不会在缓冲区空时消耗数据。不够完善的解决方法容易出现死锁的情况,此时进程都在等待唤醒。

    示意图:
    生产者消费者


    二、解决方法

    思路

    1. 采用某种机制保护生产者和消费者之间的同步。有较高的效率,并且易于实现,代码的可控制性较好,属于常用的模式。

    2. 在生产者和消费者之间建立一个管道。管道缓冲区不易控制,被传输数据对象不易于封装等,实用性不强。

    解决问题的核心

       保证同一资源被多个线程并发访问时的完整性。常用的同步方法是采用信号或加锁机制,保证资源在任意时刻至多被一个线程访问。

    Java能实现的几种方法

    1. wait() / notify()方法

    2. await() / signal()方法

    3. BlockingQueue阻塞队列方法

    4. 信号量

    5. 管道


    三、代码实现

    1. wait() / notify()方法

    当缓冲区已满时,生产者线程停止执行,放弃锁,使自己处于等状态,让其他线程执行;
    当缓冲区已空时,消费者线程停止执行,放弃锁,使自己处于等状态,让其他线程执行。

    当生产者向缓冲区放入一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态;
    当消费者从缓冲区取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。

    仓库Storage.java

    import java.util.LinkedList;
    
    public class Storage {
    
        // 仓库容量
        private final int MAX_SIZE = 10;
        // 仓库存储的载体
        private LinkedList<Object> list = new LinkedList<>();
    
        public void produce() {
            synchronized (list) {
                while (list.size() + 1 > MAX_SIZE) {
                    System.out.println("【生产者" + Thread.currentThread().getName()
    		                + "】仓库已满");
                    try {
                        list.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                list.add(new Object());
                System.out.println("【生产者" + Thread.currentThread().getName()
                        + "】生产一个产品,现库存" + list.size());
                list.notifyAll();
            }
        }
    
        public void consume() {
            synchronized (list) {
                while (list.size() == 0) {
                    System.out.println("【消费者" + Thread.currentThread().getName() 
    						+ "】仓库为空");
                    try {
                        list.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                list.remove();
                System.out.println("【消费者" + Thread.currentThread().getName()
                        + "】消费一个产品,现库存" + list.size());
                list.notifyAll();
            }
        }
    }
    

    生产者

    public class Producer implements Runnable{
        private Storage storage;
    
        public Producer(){}
    
        public Producer(Storage storage){
            this.storage = storage;
        }
    
        @Override
        public void run(){
            while(true){
                try{
                    Thread.sleep(1000);
                    storage.produce();
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
        }
    }
    

    消费者

    public class Consumer implements Runnable{
        private Storage storage;
    
        public Consumer(){}
    
        public Consumer(Storage storage){
            this.storage = storage;
        }
    
        @Override
        public void run(){
            while(true){
                try{
                    Thread.sleep(3000);
                    storage.consume();
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
        }
    }
    

    主函数

    public class Main {
    
        public static void main(String[] args) {
            Storage storage = new Storage();
            Thread p1 = new Thread(new Producer(storage));
            Thread p2 = new Thread(new Producer(storage));
            Thread p3 = new Thread(new Producer(storage));
    
            Thread c1 = new Thread(new Consumer(storage));
            Thread c2 = new Thread(new Consumer(storage));
            Thread c3 = new Thread(new Consumer(storage));
    
            p1.start();
            p2.start();
            p3.start();
            c1.start();
            c2.start();
            c3.start();
        }
    }
    

    运行结果

    【生产者p1】生产一个产品,现库存1
    【生产者p2】生产一个产品,现库存2
    【生产者p3】生产一个产品,现库存3
    【生产者p1】生产一个产品,现库存4
    【生产者p2】生产一个产品,现库存5
    【生产者p3】生产一个产品,现库存6
    【生产者p1】生产一个产品,现库存7
    【生产者p2】生产一个产品,现库存8
    【消费者c1】消费一个产品,现库存7
    【生产者p3】生产一个产品,现库存8
    【消费者c2】消费一个产品,现库存7
    【消费者c3】消费一个产品,现库存6
    【生产者p1】生产一个产品,现库存7
    【生产者p2】生产一个产品,现库存8
    【生产者p3】生产一个产品,现库存9
    【生产者p1】生产一个产品,现库存10
    【生产者p2】仓库已满
    【生产者p3】仓库已满
    【生产者p1】仓库已满
    【消费者c1】消费一个产品,现库存9
    【生产者p1】生产一个产品,现库存10
    【生产者p3】仓库已满
    。。。。。。以下省略
    

    一个生产者线程运行produce方法,睡眠1s;一个消费者运行一次consume方法,睡眠3s。此次实验过程中,有3个生产者和3个消费者,也就是我们说的多对多的情况。仓库的容量为10,可以看出消费的速度明显慢于生产的速度,符合设定。

    注意:

    notifyAll()方法可使所有正在等待队列中等待同一共享资源的“全部”线程从等待状态退出,进入可运行状态。此时,优先级最高的哪个线程最先执行,但也有可能是随机执行的,这要取决于JVM虚拟机的实现。即最终也只有一个线程能被运行,上述线程优先级都相同,每次运行的线程都不确定是哪个,后来给线程设置优先级后也跟预期不一样,还是要看JVM的具体实现吧。

    2. await() / signal()方法

    在JDK5中,用ReentrantLock和Condition可以实现等待/通知模型,具有更大的灵活性。通过在Lock对象上调用newCondition()方法,将条件变量和一个锁对象进行绑定,进而控制并发程序访问竞争资源的安全。

    在这里只需改动Storage类

    import java.util.LinkedList;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class Storage {
    
        // 仓库最大存储量
        private final int MAX_SIZE = 10;
        // 仓库存储的载体
        private LinkedList<Object> list = new LinkedList<Object>();
        // 锁
        private final Lock lock = new ReentrantLock();
        // 仓库满的条件变量
        private final Condition full = lock.newCondition();
        // 仓库空的条件变量
        private final Condition empty = lock.newCondition();
    
        public void produce()
        {
            // 获得锁
            lock.lock();
            while (list.size() + 1 > MAX_SIZE) {
                System.out.println("【生产者" + Thread.currentThread().getName()
    		             + "】仓库已满");
                try {
                    full.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            list.add(new Object());
            System.out.println("【生产者" + Thread.currentThread().getName() 
    				 + "】生产一个产品,现库存" + list.size());
    
            empty.signalAll();
            lock.unlock();
        }
    
        public void consume()
        {
            // 获得锁
            lock.lock();
            while (list.size() == 0) {
                System.out.println("【消费者" + Thread.currentThread().getName()
    		             + "】仓库为空");
                try {
                    empty.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            list.remove();
            System.out.println("【消费者" + Thread.currentThread().getName()
    		         + "】消费一个产品,现库存" + list.size());
    
            full.signalAll();
            lock.unlock();
        }
    }
    

    运行结果与wait()/notify()类似

    3. BlockingQueue阻塞队列方法

    BlockingQueue是JDK5.0的新增内容,它是一个已经在内部实现了同步的队列,实现方式采用的是我们第2种await() / signal()方法。它可以在生成对象时指定容量大小,用于阻塞操作的是put()和take()方法。

    put()方法:类似于我们上面的生产者线程,容量达到最大时,自动阻塞。
    take()方法:类似于我们上面的消费者线程,容量为0时,自动阻塞。

    import java.util.concurrent.LinkedBlockingQueue;
    
    public class Storage {
    
        // 仓库存储的载体
        private LinkedBlockingQueue<Object> list = new LinkedBlockingQueue<>(10);
    
        public void produce() {
            try{
                list.put(new Object());
                System.out.println("【生产者" + Thread.currentThread().getName()
                        + "】生产一个产品,现库存" + list.size());
            } catch (InterruptedException e){
                e.printStackTrace();
            }
        }
    
        public void consume() {
            try{
                list.take();
                System.out.println("【消费者" + Thread.currentThread().getName()
                        + "】消费了一个产品,现库存" + list.size());
            } catch (InterruptedException e){
                e.printStackTrace();
            }
        }
    }
    

    可能会出现put()或take()和System.out.println()输出不匹配的情况,是由于它们之间没有同步造成的。BlockingQueue可以放心使用,这可不是它的问题,只是在它和别的对象之间的同步有问题。

    4. 信号量

    Semaphore是一种基于计数的信号量。它可以设定一个阈值,基于此,多个线程竞争获取许可信号,做完自己的申请后归还,超过阈值后,线程申请许可信号将会被阻塞。Semaphore可以用来构建一些对象池,资源池之类的,比如数据库连接池,我们也可以创建计数为1的Semaphore,将其作为一种类似互斥锁的机制,这也叫二元信号量,表示两种互斥状态。计数为0的Semaphore是可以release的,然后就可以acquire(即一开始使线程阻塞从而完成其他执行。)。

    import java.util.LinkedList;
    import java.util.concurrent.Semaphore;
    
    public class Storage {
    
        // 仓库存储的载体
        private LinkedList<Object> list = new LinkedList<Object>();
    	// 仓库的最大容量
        final Semaphore notFull = new Semaphore(10);
        // 将线程挂起,等待其他来触发
        final Semaphore notEmpty = new Semaphore(0);
        // 互斥锁
        final Semaphore mutex = new Semaphore(1);
    
        public void produce()
        {
            try {
                notFull.acquire();
                mutex.acquire();
                list.add(new Object());
                System.out.println("【生产者" + Thread.currentThread().getName()
                        + "】生产一个产品,现库存" + list.size());
            }
            catch (Exception e) {
                e.printStackTrace();
            } finally {
                mutex.release();
                notEmpty.release();
            }
        }
    
        public void consume()
        {
            try {
                notEmpty.acquire();
                mutex.acquire();
                list.remove();
                System.out.println("【消费者" + Thread.currentThread().getName()
                        + "】消费一个产品,现库存" + list.size());
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                mutex.release();
                notFull.release();
            }
        }
    }
    

    5. 管道

    一种特殊的流,用于不同线程间直接传送数据,一个线程发送数据到输出管道,另一个线程从输入管道中读数据。

    inputStream.connect(outputStream)或outputStream.connect(inputStream)作用是使两个Stream之间产生通信链接,这样才可以将数据进行输出与输入。

    这种方式只适用于两个线程之间通信,不适合多个线程之间通信。

    1. PipedInputStream / PipedOutputStream (操作字节流)

    Producer

    import java.io.IOException;
    import java.io.PipedOutputStream;
    
    public class Producer implements Runnable {
    
        private PipedOutputStream pipedOutputStream;
    
        public Producer() {
            pipedOutputStream = new PipedOutputStream();
        }
    
        public PipedOutputStream getPipedOutputStream() {
            return pipedOutputStream;
        }
    
        @Override
        public void run() {
            try {
                for (int i = 1; i <= 5; i++) {
                    pipedOutputStream.write(("This is a test, Id=" + i + "!\n").getBytes());
                }
                pipedOutputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    

    Consumer

    import java.io.IOException;
    import java.io.PipedInputStream;
    
    public class Consumer implements Runnable {
        private PipedInputStream pipedInputStream;
    
        public Consumer() {
            pipedInputStream = new PipedInputStream();
        }
    
        public PipedInputStream getPipedInputStream() {
            return pipedInputStream;
        }
    
        @Override
        public void run() {
            int len = -1;
            byte[] buffer = new byte[1024];
            try {
                while ((len = pipedInputStream.read(buffer)) != -1) {
                    System.out.println(new String(buffer, 0, len));
                }
                pipedInputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    

    主函数

    import java.io.IOException;
    
    public class Main {
    
        public static void main(String[] args) {
            Producer p = new Producer();
            Consumer c = new Consumer();
            Thread t1 = new Thread(p);
            Thread t2 = new Thread(c);
            try {
                p.getPipedOutputStream().connect(c.getPipedInputStream());
                t2.start();
                t1.start();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    2. PipedReader / PipedWriter (操作字符流)

    Producer

    import java.io.IOException;
    import java.io.PipedWriter;
    
    public class Producer implements Runnable {
    
        private PipedWriter pipedWriter;
    
        public Producer() {
            pipedWriter = new PipedWriter();
        }
    
        public PipedWriter getPipedWriter() {
            return pipedWriter;
        }
    
        @Override
        public void run() {
            try {
                for (int i = 1; i <= 5; i++) {
                    pipedWriter.write("This is a test, Id=" + i + "!\n");
                }
                pipedWriter.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    

    Consumer

    import java.io.IOException;
    import java.io.PipedReader;
    
    public class Consumer implements Runnable {
        private PipedReader pipedReader;
    
        public Consumer() {
            pipedReader = new PipedReader();
        }
    
        public PipedReader getPipedReader() {
            return pipedReader;
        }
    
        @Override
        public void run() {
            int len = -1;
            char[] buffer = new char[1024];
            try {
                while ((len = pipedReader.read(buffer)) != -1) {
                    System.out.println(new String(buffer, 0, len));
                }
                pipedReader.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    

    主函数

    import java.io.IOException;
    
    public class Main {
    
        public static void main(String[] args) {
            Producer p = new Producer();
            Consumer c = new Consumer();
            Thread t1 = new Thread(p);
            Thread t2 = new Thread(c);
            try {
                p.getPipedWriter().connect(c.getPipedReader());
                t2.start();
                t1.start();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    

    想查看上面几种方式的完整代码,请点击这里:生产者消费者问题的一些实验


    参考

    《Java多线程编程核心技术》 高洪岩
    生产者/消费者问题的多种Java实现方式
    Producer–consumer problem – Wikipedia
    Semaphore的一种使用方法
    Semaphore实现的生产者消费者程序

    展开全文
  • 【操作系统】生产者消费者问题

    万次阅读 多人点赞 2018-08-11 00:43:20
    生产者消费者模型 生产者消费者模型 一、 生产者消费者问题 ... 生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案...

    生产者消费者模型

    一、 生产者消费者问题

    生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
    .
    要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。通常采用进程间通信的方法解决该问题。如果解决方法不够完善,则容易出现死锁的情况。出现死锁时,两个线程都会陷入休眠,等待对方唤醒自己。该问题也能被推广到多个生产者和消费者的情形。

    这里写图片描述


    二、 问题分析

    该问题需要注意的几点:

    • 在缓冲区为空时,消费者不能再进行消费
    • 在缓冲区为满时,生产者不能再进行生产
    • 在一个线程进行生产或消费时,其余线程不能再进行生产或消费等操作,即保持线程间的同步
    • 注意条件变量与互斥锁的顺序

    这里写图片描述
    由于前两点原因,因此需要保持线程间的同步,即一个线程消费(或生产)完,其他线程才能进行竞争CPU,获得消费(或生产)的机会。对于这一点,可以使用条件变量进行线程间的同步:生产者线程在product之前,需要wait直至获取自己所需的信号量之后,才会进行product的操作;同样,对于消费者线程,在consume之前需要wait直到没有线程在访问共享区(缓冲区),再进行consume的操作,之后再解锁并唤醒其他可用阻塞线程。

    这里写图片描述
    在访问共享区资源时,为避免多个线程同时访问资源造成混乱,需要对共享资源加锁,从而保证某一时刻只有一个线程在访问共享资源。


    三、 伪代码实现

    假设缓冲区大小为10,生产者、消费者线程若干。生产者和消费者相互等效,只要缓冲池未满,生产者便可将消息送入缓冲池;只要缓冲池未空,消费者便可从缓冲池中取走一个消息。

    • items代表缓冲区已经使用的资源数,spaces代表缓冲区可用资源数
    • mutex代表互斥锁
    • buf[10] 代表缓冲区,其内容类型为item
    • in、out代表第一个资源和最后一个资源
    var items = 0, space = 10, mutex = 1;
    var in = 0, out = 0;
    item buf[10] = { NULL };
    
    producer {
        while( true ) {
            wait( space );  // 等待缓冲区有空闲位置, 在使用PV操作时,条件变量需要在互斥锁之前
            wait( mutex );  // 保证在product时不会有其他线程访问缓冲区
    
            // product
            buf.push( item, in );  // 将新资源放到buf[in]位置 
            in = ( in + 1 ) % 10;
            
            signal( mutex );  // 唤醒的顺序可以不同
            signal( items );  // 通知consumer缓冲区有资源可以取走
        }
    }
    
    consumer {
        while( true ) {
            wait( items );  // 等待缓冲区有资源可以使用
            wait( mutex );  // 保证在consume时不会有其他线程访问缓冲区
    
            // consume
            buf.pop( out );  // 将buf[out]位置的的资源取走
            out = ( out + 1 ) % 10;
    
            signal( mutex );  // 唤醒的顺序可以不同
            signal( space );  // 通知缓冲区有空闲位置
        }
    }
    

    不能将线程里两个wait的顺序调换否则会出现死锁。例如(调换后),将consumer的两个wait调换,在producer发出signal信号后,如果producer线程此时再次获得运行机会,执行完了wait(space),此时,另一个consumer线程获得运行机会,执行了 wait(mutex) ,如果此时缓冲区为空,那么consumer将会阻塞在wait(items),而producer也会因为无法获得锁的所有权所以阻塞在wait(mutex),这样两个线程都在阻塞,也就造成了死锁。


    四、代码实现(C++)

    #include <iostream>
    #include <string.h>
    #include <pthread.h>
    #include <unistd.h>
    using namespace std;
    
    int current = 0;  // producer运行加1,consumer运行减1
    int buf[10];
    int in = 0, out = 0;
    int items = 0, spaces = 10;
    bool flag;  // 标记线程结束运行
    pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
    pthread_cond_t notfull = PTHREAD_COND_INITIALIZER;  // 缓冲区不满
    pthread_cond_t notempty = PTHREAD_COND_INITIALIZER;  // 缓冲区不空
    
    void *producer( void *arg ) {
        while( flag ) {
            pthread_mutex_lock( &mutex );  // 为保证条件变量不会因为多线程混乱,所以先加锁
            while( !spaces ) {  // 避免“惊群”效应,避免因其他线程实现得到事件而导致该线程“假醒”
                pthread_cond_wait( &notfull, &mutex );
            }
            buf[in] = current++;
            in = ( in + 1 ) % 10;
            items++;
            spaces--;
    
            printf( "producer %zu , current = %d\n", pthread_self(), current );
            for( int i = 0; i < 10; i++ ) {
                printf( "%-4d", buf[i] );
            }
            printf( "\n\n" );
    
            pthread_cond_signal( &notempty );
            pthread_mutex_unlock( &mutex );
        }
        pthread_exit( NULL );
    }
    
    void *consumer( void *arg ) {
        while( flag ) {
            pthread_mutex_lock( &mutex );
            while( !items ) {
                pthread_cond_wait( &notempty, &mutex );
            }
            buf[out] = -1;
            out = ( out + 1 ) % 10;
            current--;
            items--;
            spaces++;
    
            printf( "consumer %zu , current = %d\n", pthread_self(), current );
            for( int i = 0; i < 10; i++ ) {
                printf( "%-4d", buf[i] );
            }
            printf( "\n\n" );
    
            pthread_cond_signal( &notfull );
            pthread_mutex_unlock( &mutex );
        }
        pthread_exit( NULL );
    }
    
    int main() {
        memset( buf, -1, sizeof(buf) );
        flag = true;
        pthread_t pro[10], con[10];
        int i = 0;
    
        for( int i = 0; i < 10; i++ ) {
            pthread_create( &pro[i], NULL, producer, NULL );
            pthread_create( &con[i], NULL, consumer, NULL );
        }
    
        sleep(1);  // 让线程运行一秒
        flag = false;
    
        for( int i = 0; i < 10; i++ ) {
            pthread_join( pro[i], NULL );
            pthread_join( con[i], NULL );
        }
    
        return 0;
    } 
    

    五、 互斥锁与条件变量的使用比较

    我们会发现,在伪代码中强调了条件变量在前,互斥锁在后,而到了代码实现时又变成了先加互斥锁,再进行循环pthread_cond_wait()。这不是自相矛盾吗?

    其实,在伪代码中的wait()signal()就是操作系统中的PV操作,而PV操作定义就保证了该语句是原子操作,因此在wait条件变量改变的时候不会因为多进程同时访问共享资源造成混乱,所以为了保证线程间的同步,需要先加条件变量,等事件可使用后才进行线程相应的操作,此时互斥锁的作用是保证共享资源不会被其他线程访问。

    而在代码实现中,signal()对应的时pthread_cond_wait()函数,该函数在执行时会有三步:

    • 解开当前的锁
    • 等待条件变量达到所需要的状态
    • 再把之前解开的锁加锁

    为了实现将pthread_cond_wait()变成原子操作,就需要在该函数之前添加互斥锁。因为pthread_cond_wait()可以解锁,也就不会发生像伪代码所说的死锁问题。相反,如果像伪代码那样先使用条件变量,后加锁,则会造成多个线程同时访问共享资源的问题,造成数据的混乱。


    欢迎关注微信公众号,不定时分享学习资料与学习笔记,感谢!
    在这里插入图片描述

    展开全文
  • 生产者消费者问题 C++实现

    万次阅读 多人点赞 2018-06-29 17:00:03
    生产者消费者问题 C++实现 知识准备 thread 介绍 成员类 成员函数 sleep_for 介绍 mutex 介绍 成员函数 unique_lock 介绍 成员函数 codition_variable 介绍 成员函数 代码示例 生产者消费者问题 ...

    生产者消费者问题 C++实现

    知识准备

    thread

    介绍

    • 定位于头文件的class thread
    • 表示单个执行线程, 没有两个thread对象会表示同一个线程
      • 不可复制构造
      • 不可复制赋值

    成员类

    成员函数

    • get_id

      • 返回线程的id
    • hardware_concurrency

      • 返回实现支持的并发线程数
    • join

      • 等待线程完成其执行

    sleep_for

    介绍

    • 定义域头文件

    • 声明

      • template< class Rep, class Period >
        void sleep_for( const std::chrono::duration<Rep, Period>& sleep_duration );
        
      • sleep_duration : 要睡眠的时长

    mutex

    介绍

    • 定义于头文件

    • mutex 提供排他性非递归所有权语义:

      • 调用方线程从它成功调用 locktry_lock 开始,到它调用 unlock 为止占有 mutex
      • 线程占有 mutex 时,所有其他线程若试图要求 mutex 的所有权,则将阻塞(对于 lock 的调用)或收到 false 返回值(对于 try_lock ).
      • 调用方线程在调用 locktry_lock 前必须不占有 mutex

    成员函数

    • lock

      • 锁定互斥, 若互斥不可用则堵塞
    • unlock

      • 解锁互斥

    unique_lock

    介绍

    • 定义于

    • 声明

      template< class Mutex >
      class unique_lock;
      

    成员函数

    • lock
      • 锁定关联互斥
    • unlock
      • 解锁关联互斥
    • mutex
      • 返回指向关联互斥的指针

    codition_variable

    介绍

    • 定义于头文件 <condition_variable>
    • 声明class condition_variable;
    • 有意修改变量的线程必须:
        1. 获得std:: mutex(通过std::unique_lock)
        2. 在保有锁时进行修改
        3. 执行 notify_onenotify_all
    • 任何有意在 std::condition_variable 上等待的线程必须
        1. 获得std::unique_lock <std::mutex>
        2. 执行 waitwait_forwait_until ,等待操作自动释放互斥,并悬挂线程的执行
        3. 线程被唤醒,且自动重获得互斥
    • **std::condition_variable只可与 std::unique_lock<std::mutex>一同使用;

    成员函数

    • notify_one

      • 通知一个等待线程
    • notify_all

      • 通知所有等待线程
    • wait

      • 阻塞当前进程, 直至被唤醒
    • wait_for

      • 阻塞当前线程,直到条件变量被唤醒,或到指定时限时长后
      • wait_until
        • 阻塞当前线程,直到条件变量被唤醒,或直到抵达指定时间点
          

    代码示例

    // operator_system.cpp: 定义控制台应用程序的入口点。
    //
     
    #include "stdafx.h"
    #include<iostream>
    #include <mutex>
    #include <condition_variable>
    #include <windows.h>
    #include <thread>
     
    using namespace std;
     
    static const int buffer_size = 10; // 缓存大小
    static const int item_total = 100; //总共要生产 item_total个item
     
    // 缓存结构体, 使用循环队列当做缓存
    struct Buffer 
    {
    	int buffer[buffer_size];
    	size_t read_position; // 当前读位置
    	size_t write_position; // 当前写位置
    	mutex mtx; // 读写互斥
    	//条件变量
    	condition_variable not_full; 
    	condition_variable not_empty;
    }buffer_res;
     
    typedef struct Buffer Buffer;
     
    void porduce_item(Buffer *b, int item)
    {
    	unique_lock<mutex> lock(b->mtx);//设置互斥锁
     
    	while(((b->write_position + 1) % buffer_size) == b->read_position) {
    		//当前缓存已经满了
    		cout << "buffer is full now, producer is wating....." << endl;
    		(b->not_full).wait(lock); // 等待缓存非full
    	}
    	// 向缓存中添加item
    	(b->buffer)[b->write_position] = item;
    	(b->write_position)++;
     
    	// 若到达最后一个, 写位置置位0
    	if (b->write_position == buffer_size)
    		b->write_position = 0;
     
    	(b->not_empty).notify_all();
    	lock.unlock();
    }
     
    int consume_item(Buffer *b)
    {
    	int data;
    	unique_lock <mutex> lock(b->mtx);
    	while (b->write_position == b->read_position)
    	{	// 当前buffer 为空
    		cout << "buffer is empty , consumer is waiting....." << endl;
    		(b->not_empty).wait(lock);
    	}
     
    	data = (b->buffer)[b->read_position];
    	(b->read_position)++;
     
    	if (b->read_position >= buffer_size)
    		b->read_position = 0;
     
    	(b->not_full).notify_all();
    	lock.unlock();
     
    	return data;
    }
     
    //生产者任务
    void producer() {
    	for (int i = 1; i<= item_total;i++) {
    		cout << "prodece the " << i << "^th item ..." << endl;
    		porduce_item(&buffer_res, i);
    	}
    }
     
    //消费者任务
    void consumer()
    {
    	static int cnt = 0;
    	while(1) {
    		Sleep(1);
    		int item = consume_item(&buffer_res);
    		cout << "consume the " << item << "^th item" << endl;
    		if (++cnt == item_total)
    			break;
    	}
    }
     
    //初始化 buffer
    void init_buffer(Buffer *b)
    {
    	b->write_position = 0;
    	b->read_position = 0;
    }
     
    int main()
    {
    	init_buffer(&buffer_res);
    	thread prodece(producer);
    	thread consume(consumer);
    	prodece.join();
    	consume.join();
    	getchar();
    }
     
    

    查看其他精彩文章
    深入分析HashMap
    AOP核心原理和SpringAOP
    10分钟入门SpringAOP

    展开全文
  • 信号量与生产者消费者问题

    万次阅读 多人点赞 2017-01-19 15:06:44
    生产者—消费者问题 生产者—消费者题型在各类考试(考研、程序员证书、程序员面试笔试、期末考试)很常见,原因之一是生产者...生产者—消费者题型最基本的是有界缓冲区的生产者消费者问题和无界缓冲区的生产者消费者
  • 操作系统_生产者消费者问题

    千次阅读 多人点赞 2020-03-17 21:29:53
    1,生产者消费者问题 问题的提出 初步思考 进程资源共享关系和同步关系分析 问题的具体解决 第一搏 存在的问题 第二搏 多维度思考 1,单生产者、单消费者、多缓冲区 2,多生产者、多消费者、单缓冲 3,单...
  • 秒杀多线程第十篇 生产者消费者问题

    万次阅读 多人点赞 2012-05-21 10:18:09
    继经典线程同步问题之后,我们来看看生产者消费者问题及读者写者问题。生产者消费者问题是一个著名的线程同步问题,该问题描述如下:有一个生产者在生产产品,这些产品将提供给若干个消费者去消费,为了使生产者和...
  • Java可视化实现生产者消费者问题

    千次阅读 2019-09-28 22:14:06
    引言:生产者消费者问题是一个十分经典的多线程问题。为了更加形象地描述这个问题,采用可视化的形式展示此过程。
  • 生产者消费者问题源程序

    千次阅读 2015-11-01 04:18:18
    生产者消费者问题(Producer-consumer problem)是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是...
  • 生产者消费者问题、Java实现

    千次阅读 2016-05-10 00:11:58
    生产者消费者问题(Producer-consumer problem)也可以叫有限缓冲问题(Bounded-buffer problem),是一个经典的进程/线程同步问题。
  • 生产者消费者问题-代码详解(Java多线程)

    千次阅读 热门讨论 2020-06-13 11:10:07
    文章目录一、生产者消费者问题二、代码实现三、拓展知识 一、生产者消费者问题 生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的...
  • 操作系统模拟生产者消费者问题

    千次阅读 2018-04-19 11:08:07
    pv操作及生产者消费者问题解析 #include&lt;stdio.h&gt; #include&lt;stdlib.h&gt; #include&lt;string.h&gt; #include&lt;time.h&gt; #include&lt;windows.h&gt; void ...
  • 1.生产者消费者问题 1.1.问题分析 1.2.如何实现 2.多生产者多消费者问题 2.1.问题描述 2.2.问题分析 2.3.如何实现 1.生产者消费者问题 1.1.问题分析 1.2.如何实现 2.多生产者多消费者问题 多是...
  • 生产者消费者问题 多个生产者和多个生产者的问题。生产者不断的向仓库放入产品,消费者不断的从仓库取出产品,仓库的容量是有限的。因此,当仓库处于满状态时,生产者必须等待消费者取出 1 个或多个产品后才能继续...
  • 关于生产者消费者问题的OpenMP实现

    千次阅读 2018-07-23 11:53:46
    关于生产者消费者问题的OpenMP实现 1. 博客内容: 针对典型的生产者和消费者问题,使用OpenMP编程,实现生产者生成随机数,由消费者求和并打印的操作。 2. 问题分析: 数据竞争问题:当有多个生产者向同一...
  • (1)线程同步,实现“生产者消费者问题” 要求:缓冲区大小为20,生产者每次放一个产品,消费者每次取走一个产品;生产者和消费者至少2个。 (2)代码如下: #include #include #include #include void *...
  • 生产者消费者问题分析
  • 生产者消费者问题(Windows下C语言版)

    千次阅读 多人点赞 2020-02-16 22:20:17
    生产者消费者问题Producer-consumer problem 也称有限缓冲问题,描述了两个共享固定大小缓冲区的线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到...
  • 问题生产者消费者问题&amp;amp;amp;amp;amp;哲学家进餐问题&amp;amp;amp;amp;amp;读者写者问题 一. 目录 操作系统5————进程同步的经典问题:司机售票员&amp;amp;amp;amp;amp;问题生产...
  • Linux进程间通信与生产者消费者问题

    千次阅读 2013-01-20 20:43:48
    生产者消费者问题(英语:Producer-consumerproblem),也称有限缓冲问题(英语:Bounded-bufferproblem),是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的线程——即所谓的“生产者”和...
  • 生产者消费者问题 这是一个非常经典的多线程题目,题目大意如下:有一个生产者在生产产品,这些产品将提供给若干个消费者去消费,为了使生产者和消费者能并发执行,在两者之间设置一个有多个缓冲区的缓冲池,生产者...
  • Java 生产者消费者问题

    万次阅读 2011-11-03 10:35:18
    生产者消费者问题是研究多线程程序时绕不开的问题,它的描述是有一块生产者和消费者共享的有界缓冲区,生产者往缓冲区放入产品,消费者从缓冲区取走产品,这个过程可以无休止的执行,不能因缓冲区满生产者放不进产品...
  • 同步互斥问题 - 生产者消费者问题 问题描述: 有多个进程:多个生产者进程和多个消费者进程共享一个初始为空、固定大小为n的缓存(缓冲区)。生产者的工作是制造数据,只有缓冲区没满时,生产者才能把消息放入...
  • 生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个线程——即所谓的“生产者”和...
  • 生产者消费者问题问题描述是:有一群生产者进程在生产产品,此产品提供给消费者去消费。为使生产者和消费者进程能并发执行,在它们之间设置一个具有n个缓冲池,生产者进程可将它所生产的产品放入一个缓冲池中,消费...
  • 生产者消费者问题的C语言实现

    万次阅读 2018-06-09 12:52:18
    实验六 生产者/消费者问题实验一、实验目的掌握Linux下生产者/消费者问题算法的实现 二、实验原理1.clone系统调用:功能:创建一个轻进程或线程用法:intclone (int (*fn)(void *arg),void *stack,int flag,void *...
  • 生产者消费者问题(代码实现)

    千次阅读 2016-07-19 22:29:29
    生产者-消费者问题(也被称为有界缓冲器问题)是一个典型的例子多线程同步的问题。问题描述了两个进程,生产者和消费者,谁都有一个共同的,固定大小的缓冲区作为一个队列。制片人的工作是生成数据,把它放入缓冲区...
  • java多线程模拟生产者消费者问题
  • 今天看了一片博文,讲Java多线程之线程的协作,其中作者用程序实例说明了生产者消费者问题,但我及其他读者发现程序多跑几次还是会出现死锁,百度搜了下大都数的例子也都存在bug,经过仔细研究发现其中的问题,并...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 308,630
精华内容 123,452
关键字:

生产者消费者问题