2018-04-16 21:02:08 qq_39511059 阅读数 4058

设计一个程序,程序中有一个生产者进程两个消费者进程。生产者产生1~20的20个数。两个消费者从共享内存中取数。

首先分析题目
生产者和消费者之间存在着协同关系。而两个消费者要满足互斥。所以我们需要三个信号量,一个用来控制互斥,两个用来控制协同

我们知道前一种非常简单,实现互斥就是用一把公用的锁把临界区锁住

而协同关系则复杂一些。我们知道信号量的值就是进程可以使用的资源的个数。对于生产者来说,资源就是空的存储区。对于消费者来说,资源就是存有数据的存储区。所以用两个信号量,一个是empty,一个是full。empty初始值为1代表一开始内存是空闲的,full初始值为0表示没有数据可用。生产者在生产之前他会将empty变成0,然后生产之后他又把full变成1。empty避免了其他生产者在同一块内存上生产数据。而消费者在消费之前就把full变成0,消费之后又把empty变成1。这样两两协同,便实现了协同和互斥的关系。

接下来贴代码

包含头文件并定义所需全局变量:
#include <iostream>
#include <sys/types.h>
#include <sys/sem.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/mman.h>
#include <errno.h>
#include <time.h>
#include<unistd.h>
using namespace std;
        struct sembuf P,V;
        union senum{
            int val;
            struct semid_ds *buf; 
           unsigned short *array;
        }arc;

封装函数:
void sem_init()//初始化信号量的运行环境
{
P.sem_num= 0;
P.sem_op =-1;//信号量在操作时-1
P.sem_flg=SEM_UNDO;
V.sem_num= 0;
V.sem_op =1;//信号量在操作时+1
V.sem_flg=SEM_UNDO;
cout<<"Semaphore Initiate Successfully"<<endl;
}

void v(int sem_id)//v操作
{
    semop(sem_id,&V,1);
}
void p(int sem_id)//p操作
{
    semop(sem_id,&P,1);
}
void sem_crea(int &sem_id,int val)//用val来给信号量赋初值
{
    sem_id=semget(IPC_PRIVATE,1,IPC_CREAT|00666);
    arc.val=val;
    semctl(sem_id,0,SETVAL,arc);
}

主函数:
int main()
{
 sem_init();
 int mutx;
 int full;
 int empt;
 char *shares;
 int *num;
 int shareid;
 shareid=shmget(5,1280,0777|IPC_CREAT);//用同一个key可以得到同一块共享内存
 shares=(char*)shmat(shareid,0,0);//得到共享内存的指针
 sem_crea(mutx,1);
 sem_crea(full,0);
 sem_crea(empt,1);
 num=(int *)shares;
 pid_t p1;
 pid_t p2;
 p1=fork();//fork在子进程中返回0在父进程中返回子进程的pid_t
 p2=fork();
 if(p1==0)//生产者进程
 {
 int n=20;
 while(n--)
 {
 p(empt);
 *num=n;
 v(full);
 }
 sleep(5);
exec(0);
 }
 else if(p2==0)//两个消费者进程
 {
 while(1)
 {
 p(full);
 p(mutx);
 cout<<"comsumer A :"<<*num<<endl;
  v(mutx);
 v(empt);
 }
 }
 else
 {
 while(1)
 {
 p(full);
 p(mutx);
 cout<<"comsumer B :"<<*num<<endl;
 v(mutx);
 v(empt);
 }
 }
 return 0;
}


2019-07-27 18:51:12 chenxiyuehh 阅读数 48

信号量

前面我们在基于阻塞队列的生产者消费者模型 今天我们来看一下基于信号量的生产者消费者模型,也可以说是基于环形队列的。

POSIX信号量

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步

初始化信号量

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
		pshared:0表示线程间共享,非零表示进程间共享
		value:信号量初始值

销毁信号量

int sem_destroy(sem_t *sem);

等待信号量

功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem);

发布信号量

功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);

基于环形队列的生产者消费者模型

  • 环形队列采用数组模拟,用模运算来模拟环状特性
  • 环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态
  • 但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程
  • 简单的总结一句话就是,你不能超过我,我不能把你套个圈,这里你是消费者,我是生产者

源码展示

  • ring.hpp
#ifndef __RING_HPP__
#define __RING_HPP__

#include <iostream>
#include <semaphore.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>

#define NUM 32

using namespace std;

class Ring{
    private:
        int circle[NUM];//环形队列
        int cap;//信号量的容量
        int c_step;//消费者下标
        int p_step;//生产者下标
        sem_t sem_data;//消费者可以消费的数据
        sem_t sem_blank;//生产者可以生产的位置

        void P(sem_t &s)
        {
            sem_wait(&s);
        }
        void V(sem_t &s)
        {
            sem_post(&s);
        }
    public:
        Ring():cap(NUM)
        {
            c_step = p_step = 0;
            sem_init(&sem_data,0,0);
            sem_init(&sem_blank,0,NUM);
        }

        //消费者调用该接口进行消费
        void GetData(int& out)
        {
            P(sem_data);//消费者申请数据资源消费
            out = circle[c_step];//消费者从环形队列中取数据
            c_step++;
            V(sem_blank);//消费者增加空间
            c_step %= cap;//当走到最后时重新置为0
        }
        //生产者调用该接口进行生产
        void PutData(const int& in)
        {
            P(sem_blank);//生产者申请空间生产
            circle[p_step] = in;//往环形队列中插入数据
            p_step++;//下标++
            V(sem_data);//生产者增加数据
            p_step %= cap;//因为是环形队列,当走到下标最后时将它重置为0
        }

        ~Ring()
        {
            sem_destroy(&sem_data);
            sem_destroy(&sem_blank);
        }
};

#endif
#include "ring.hpp"
#include <time.h>

void* consumer(void* arg)
{
    Ring* r = (Ring*)arg;
    int data;
    for(;;){
        r->GetData(data);
        cout << "consumer data:"<< data <<endl;
    }
}

void* producter(void* arg)
{
    Ring* r = (Ring*)arg;
    for(;;){
        int data = rand() % 100 + 1;
        r->PutData(data);
        cout << "product data:"<< data <<endl;
        sleep(1);
    }
}

int main()
{
    srand((unsigned long)time(NULL));//生成随机数
    Ring r;//定义一个环形队列,基于信号量
    pthread_t c,p;//创建两个线程c为消费者,p为生产者
    pthread_create(&c,NULL,consumer,(void*)&r);//让生产者和消费者都看到该环形队列
    pthread_create(&p,NULL,producter,(void*)&r);


    pthread_join(c,NULL);
    pthread_join(p,NULL);

    return 0;
}
  • Makefile
ring:ring.cc
	g++ -o $@ $^ -lpthread
.PHONY:clean
clean:
	rm -f ring
2015-12-15 17:23:47 youyongyoumou 阅读数 599

该程序为Linux信号量机制实现程序,主要模拟了一般的生产者-消费者问题。(生产者-消费者问题是一个经典的进程同步问题,该问题最早由Dijkstra提出,用以演示他提出的信号量机制。在同一个进程地址空间内执行的两个线程。生产者线程生产物品,然后将物品放置在一个空缓冲区中供消费者线程消费。消费者线程从缓冲区中获得物品,然后释放缓冲区。当生产者线程生产物品时,如果没有空缓冲区可用,那么生产者线程必须等待消费者线程释放出一个空缓冲区。当消费者线程消费物品时,如果没有已用的缓冲区,那么消费者线程将被阻塞,直到新的物品被生产出来。

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
#include <signal.h>
 
#define N 5   // 消费者或者生产者的数目
#define M 10 // 缓冲数目
//int M=10;
int in = 0; // 生产者放置产品的位置
int out = 0; // 消费者取产品的位置
 
int buff[M] = { 0 }; // 缓冲初始化为0,开始时没有产品
 
sem_t empty_sem; // 同步信号量,当满了时阻止生产者放产品
sem_t full_sem; // 同步信号量,当没产品时阻止消费者消费
pthread_mutex_t mutex; // 互斥信号量,一次只有一个线程访问缓冲
 
int product_id = 0; //生产者id
int prochase_id = 0; //消费者id
//信号处理函数
void Handlesignal(int signo){
    printf("程序退出\n",signo);
    exit(0);
}
/* 打印缓冲情况 */
void print() {
       int i;
       printf("产品队列为");
       for(i = 0; i < M; i++)
              printf("%d", buff[i]);
       printf("\n");
}
 
/* 生产者方法 */
void *product() {
       int id = ++product_id;
       while(1) {//重复进行
              //用sleep的数量可以调节生产和消费的速度,便于观察
              sleep(2);
 
              sem_wait(&empty_sem);
              pthread_mutex_lock(&mutex);
 
              in= in % M;
              printf("生产者%d在产品队列中放入第%d个产品\t",id, in);
 
              buff[in]= 1;
              print();
              ++in;
 
              pthread_mutex_unlock(&mutex);
              sem_post(&full_sem);
       }
}
 
/* 消费者方法 */
void *prochase() {
       int id = ++prochase_id;
       while(1) {//重复进行
              //用sleep的数量可以调节生产和消费的速度,便于观察
              sleep(5);
 
              sem_wait(&full_sem);
              pthread_mutex_lock(&mutex);
 
              out= out % M;
              printf("消费者%d从产品队列中取出第%d个产品\t",id, out);
 
              buff[out]= 0;
              print();
              ++out;
 
              pthread_mutex_unlock(&mutex);
              sem_post(&empty_sem);
       }
}
 
int main() {
       printf("生产者和消费者数目都为5,产品缓冲为10,生产者每2秒生产一个产品,消费者每5秒消费一个产品,Ctrl+退出程序\n");
       pthread_t id1[N];
       pthread_t id2[N];
       int i;
       int ret[N];
       //结束程序
    if(signal(SIGINT,Handlesignal)==SIG_ERR){//按ctrl+C产生SIGINT信号
    printf("信号安装出错\n");
    }
// 初始化同步信号量
       int ini1 = sem_init(&empty_sem, 0, M);//产品队列缓冲同步
       int ini2 = sem_init(&full_sem, 0, 0);//线程运行同步
       if(ini1 && ini2 != 0) {
              printf("信号量初始化失败!\n");
              exit(1);
       }
//初始化互斥信号量
       int ini3 = pthread_mutex_init(&mutex, NULL);
       if(ini3 != 0) {
              printf("线程同步初始化失败!\n");
              exit(1);
       }
// 创建N个生产者线程
       for(i = 0; i < N; i++) {
              ret[i]= pthread_create(&id1[i], NULL, product, (void *) (&i));
              if(ret[i] != 0) {
                     printf("生产者%d线程创建失败!\n", i);
                     exit(1);
              }
       }
//创建N个消费者线程
       for(i = 0; i < N; i++) {
              ret[i]= pthread_create(&id2[i], NULL, prochase, NULL);
              if(ret[i] != 0) {
                     printf("消费者%d线程创建失败!\n", i);
                     exit(1);
              }
       }
//等待线程销毁
       for(i = 0; i < N; i++) {
              pthread_join(id1[i], NULL);
              pthread_join(id2[i],NULL);
       }
       exit(0);
}

在linux下编译语句为:gcc -o producer_consumer.out producer_consumer.c -lpthread

这个是因为pthread并非Linux系统的默认库,编译时注意加上-lpthread参数,以调用链接库

2014-07-18 21:44:00 weixin_33730836 阅读数 7

  菜鸟偶遇信号量,擦出火花(只有不熟才会有火花)。于是上网搜资料和看《Unix环境高级编程》实现了几个小例题,高手请勿喷!这几位写得非常好啊:

题目来源: http://www.it165.net/os/html/201312/7039.html

信号量及其用法:http://www.cnblogs.com/hjslovewcl/archive/2011/03/03/2314341.html

Mutex与Semaphore区别著名的厕所理论:http://koti.mbnet.fi/niclasw/MutexSemaphore.html

   哎呀,暴露了!我不是故意偷窥别人的……




 

一:一个生产者、一个消费者、一个资源情况

  这种情况情况可以只用一个信号量,要生成或要消费只用尝试获取这个信号量,这里用了两个:full=1和empty=0,两个只为了和后面一致,1、0是赋初值。生产者和消费者情况如下:

//生产者:
P(empty)
    生成资源并放进资源处
V(full)


//消费者:
P(full)
    消费资源
V(empty)

  若生产者最先开始生产资源,P(empty),full和empty都成了0,此时若消费者想要消费,则P(full)时,full为0则睡眠等待,等生产者生结束就把full加1,看到消费者可怜地睡着了就唤醒它,然后消费者把full减1自己快活去了。

  消费者消费过程中生产者若要生了,则会因为empty为0而休眠,等消费者结束就把empty加1,然后生产者开始生产。

  上面的好理解,下面上代码:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

#include <x86_64-linux-gnu/sys/types.h>
#include <x86_64-linux-gnu/sys/ipc.h>
#include <x86_64-linux-gnu/sys/sem.h>

int semInite(int semId, int value);
int semDelete(int semId);
int semP(int semId);
int semV(int semId);

//declare a union to be used
union semun 
{
    int val;                        /* value for SETVAL */ 
    struct semid_ds *buf;                /* buffer for IPC_STAT, IPC_SET */ 
    unsigned short int *array;         /* array for GETALL, SETALL */ 
    struct seminfo *__buf;                /* buffer for IPC_INFO */ 
};


//semaphore declare
static int semFullId;
static int semEmptyId;
static int source = 0;        //source definition    


//new thread as a consumer
void* child_thread(void* arg)
{
    int ttt = 1;
        
    while(1)
    {
        sleep(rand() % 19);
        printf("child No.%d times wants to consume...\n", ttt);
        
        semP(semFullId);    //
        printf("child No.%d times start consuming. source = %d\n", ttt, source);
        source = 0;
        printf("child No.%d times end consuming. source = %d\n\n", ttt++, source);
        semV(semEmptyId);    //
    }
    
    return (void*)0;
}

int main(void)
{    
    //create semaphore
    semFullId = semget((key_t)1235, 1, 0666 | IPC_CREAT);
    semEmptyId = semget((key_t)1236, 1, 0666 | IPC_CREAT);
    semInite(semFullId, 0);
    semInite(semEmptyId, 1);
    
    pthread_t pid;
    pthread_create(&pid, NULL, child_thread, NULL);
    
    int tt = 1;    
    while(1)
    {
        sleep(rand() % 18);
        printf("parent No.%d times wants to produce...\n", tt);
        
        semP(semEmptyId);    //
        printf("parent No.%d times start producing. source = %d\n", tt, source);
        source = rand() % 100;
        printf("parent No.%d times end producing. source = %d\n", tt++, source);
        semV(semFullId);    //
    }
    
    semDelete(semFullId);
    semDelete(semEmptyId);
    return 0;
}


//set semaphore as default value
int semInite(int semId, int value)
{
    union semun semUnion;
    semUnion.val = value;    //set default semaphore
    return semctl(semId, 0, SETVAL, semUnion);
}

//delete semaphore
int semDelete(int semId)
{
    union semun semUnion;
    return semctl(semId, 0, IPC_RMID, semUnion);
}

//semaphore P operation
int semP(int semId)
{
    struct sembuf semBuf;
    semBuf.sem_num = 0;        //indicate it is not semaphore array
    semBuf.sem_op = -1;        //subtract one
    semBuf.sem_flg = SEM_UNDO;
    
    return semop(semId, &semBuf, 1);    //return value
}

//semaphore V operation
int semV(int semId)
{
    struct sembuf semBuf;
    semBuf.sem_num = 0;        //indicate it is not semaphore array
    semBuf.sem_op = 1;        //subtract one
    semBuf.sem_flg = SEM_UNDO;
    
    return semop(semId, &semBuf, 1);    //return value
}
111

两个信号量其实应该用信号量集的,因为它本来就是针对集合的,但是由于刚入门,为了易懂,就用两个。两个线程,创建的新线程当做消费者了。其中unix的几个信号量的函数看了半天,有点复杂,简单不准确来讲:

//获得一个信号量啦,第二个参数是想要创建的信号量个数,
//因为unix操作的是信号量集合,设为1不就一个信号量了嘛
//其他参数我不管了
int semget(key_t key, int num_sems, int sem_flags);

//信号量集合的操作,这个可以用来实现P、V的 +1 -1 的功能
int semop(int sem_id, struct sembuf *sem_ops, size_t num_sem_ops);

//信号量集合的控制,如初始化删除等
int semctl(int sem_id, int sem_num, int command, ...);

运行:




 

二:一个生产者、一个消费者、N个资源情况

  这里资源用是一个数组代替了。其实本质上和上面类似,每次只让生产者或消费者中的一个进入,进入后放到哪个地方或从哪个地方取就得用一个标志来说明了,其实也可以为每一资源加上信号量的。

  这里在生产者和消费者那里都设置了一个static的变量当做游标,指示下个资源放到哪个位置和下次从哪取资源。staitic变量用在这里很合适,因为只会初始化一次。

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

#include <x86_64-linux-gnu/sys/types.h>
#include <x86_64-linux-gnu/sys/ipc.h>
#include <x86_64-linux-gnu/sys/sem.h>

#define N 5

int semInite(int semId, int value);
int semDelete(int semId);
int semP(int semId);
int semV(int semId);

//declare a union to be used
union semun 
{
    int val;                        /* value for SETVAL */ 
    struct semid_ds *buf;                /* buffer for IPC_STAT, IPC_SET */ 
    unsigned short int *array;         /* array for GETALL, SETALL */ 
    struct seminfo *__buf;                /* buffer for IPC_INFO */ 
};


//semaphore declare
static int semFullId;
static int semEmptyId;
static int srcArr[N];        //source definition    


//new thread as a consumer
void* child_thread(void* arg)
{
    int ttt = 1;
        
    while(1)
    {
        static int pToGet = 0;    //get source from the position
        sleep(rand() % 19);
        printf("child No.%d times wants to consume(get from index %d)...\n", ttt, pToGet);
        
        semP(semFullId);    //
        printf("child No.%d times start consuming.(get from index %d, data is %d)\n", ttt, pToGet, srcArr[pToGet]);
        srcArr[pToGet] = 0;
        printf("child No.%d times end consuming. (get from index %d)\n\n", ttt++, pToGet);
        pToGet = (pToGet + 1) % N;
        semV(semEmptyId);    //
    }
    
    return (void*)0;
}

int main(void)
{    
    //create semaphore
    semFullId = semget((key_t)1235, 1, 0666 | IPC_CREAT);
    semEmptyId = semget((key_t)1236, 1, 0666 | IPC_CREAT);
    semInite(semFullId, 0);
    semInite(semEmptyId, N);    //N source
    
    pthread_t pid;
    pthread_create(&pid, NULL, child_thread, NULL);
    
    int tt = 1;    
    while(1)
    {
        static int pToPut = 0;        //next position where source to be filled in
        sleep(rand() % 18);
        printf("parent No.%d times wants to produce(put in %d index)...\n", tt, pToPut);
        
        semP(semEmptyId);    //
        printf("parent No.%d times start producing.(put in %d index, original data is %d)\n", tt, pToPut, srcArr[pToPut]);
        int temp = rand() % 100;
        srcArr[pToPut] = temp;
        printf("parent No.%d times end producing.(put in %d index, now data is %d)\n", tt++, pToPut, srcArr[pToPut]);
        pToPut = (pToPut + 1) % N;
        semV(semFullId);    //
    }
    
    semDelete(semFullId);
    semDelete(semEmptyId);
    return 0;
}


//set semaphore as default value
int semInite(int semId, int value)
{
    union semun semUnion;
    semUnion.val = value;    //set default semaphore
    return semctl(semId, 0, SETVAL, semUnion);
}

//delete semaphore
int semDelete(int semId)
{
    union semun semUnion;
    return semctl(semId, 0, IPC_RMID, semUnion);
}

//semaphore P operation
int semP(int semId)
{
    struct sembuf semBuf;
    semBuf.sem_num = 0;        //indicate it is not semaphore array
    semBuf.sem_op = -1;        //subtract one
    semBuf.sem_flg = SEM_UNDO;
    
    return semop(semId, &semBuf, 1);    //return value
}

//semaphore V operation
int semV(int semId)
{
    struct sembuf semBuf;
    semBuf.sem_num = 0;        //indicate it is not semaphore array
    semBuf.sem_op = 1;        //subtract one
    semBuf.sem_flg = SEM_UNDO;
    
    return semop(semId, &semBuf, 1);    //return value
}
222

运行结果:




 

三:N个生产者,N个消费者,N个资源

   这种情况不仅生产者和消费者之间要通过上述的方式协调使用资源,而且生产者内部和消费者内部也要协调。定义四个信号量:

empty——表示缓冲区是否为空,初值为n。
full——表示缓冲区中是否为满,初值为0。
mutex1——生产者之间的互斥信号量,初值为1。
mutex2——消费者之间的互斥信号量,初值为1。

//生产者进程
P(mutex1)
    P(empty)
        生产数据并放进特定位置
    V(full)
V(mutex1)

//消费者进程
P(mutex2)
    P(full)
        消费数据
    V(empty)
V(mutex2)

其实上面生产者或者消费者获取互斥量或信号量的顺序可以颠倒的,不会产生死锁。

  当然这个问题可以用其他更好的方式解决,我还得继续学习。

转载于:https://www.cnblogs.com/jiayith/p/3854312.html

2019-10-29 22:00:30 cat_want_fly 阅读数 11

生产者消费者模型

在上一篇博客[Linux]------基于阻塞队列的生产者消费者模型中,我们实现了单生产者消费者模型,在本篇博客中,我们的生产者消费者模型会有两个改变

  1. 不是用一般队列作为缓冲区,而是用环形队列作为缓冲区
  2. 不使用pthread_mutex,而使用信号量来实现生产者消费者的同步。

POSIX信号量

使用的是POSIX信号量而不是之前的system_v,因为这两个信号量作用相同,都用于同步操作,但POSIX可以用于线程间同步。
这里可以暂时把信号量当成一个计数器,当值未1时代表,有一份资源可以使用,有人申请这份资源时,值会减1,当值为0时,,其他人想继续申请这份资源,会被挂起放入等待队列中。
初始化信号量,在参数中可以传入非0的值n,代表有n个资源。
信号量有两个重要的操作,PV操作,P操作,释放资源,会使资源数+1,V操作,申请资源,会使资源数减1,下面是POSIX的具体函数。

POSIX函数

初始化信号量

#include <semaphore.h>
int sem_init(sem_t *sem,int pshared, unsigned int value);
参数:
	pshared:0表示线程间共享,非0表示进程间共享
	value:信号量初始值

销毁信号量

int sem_destroy(sem_t *sem);

等待信号量

int sem_wait(sem_t *sem);
功能:等待信号量,将信号量的值减1

发布信号量

int sem_post(sem_t *sem);
功能:发布信号量,表示资源使用完毕,可以归还资源了,将信号量+1

循环队列

和普通队列不同,循环队列首尾相接,生产者P向队列中不断插入数据,当插入满了后,消费者C从队列中读取数据,当消费者C处理完数据后队列成空,生产者P又在队列中插入数据。
在这里插入图片描述
但是有一个问题出现了,当P和V同时指向一个位置时,你无法判断是队列是空还是满(这里暂时不考虑,P比C多走了n圈的可能)
在这里插入图片描述
在这里插入图片描述
上面的信号量解决了这个问题,信号量相当于计数器,所以可以设置两个信号量,一个记录队列里插入数据的数量,另一个信号量记录队列里未插入的空间的数量。

  • 如果对队列进行插入数据操作,记录插入数的信号量执行P操作,记录剩余空间的信号量执行V操作。
  • 如果对队列进行删除数据操作,记录插入数的信号量执行V操作,记录剩余空间的信号量执行P操作。

实际上我们可以用一个信号量,判断当两个指针指向同一片地址时是满还是空,为什么需要两个信号?
原因,假如只有一个信号量,则只能对一个对象进行限制,假设信号量记录未插入的空间记录为0时,消费者阻塞,生产者生产,但是buffer时有限的,但是没有信号量记录插入的数已经使队列满了,换句话说生产者的生产行为是不受限制的,不满足生产者消费者模型。

解决完了判断队列满或空后,还有第二个问题,在STL中没有环形队列,需要我们自己实现,这个难不倒我们,可以利用哈希表的开散列思想,用数组作为环形队列,对插入的地址进行取模,作为数组下标。
在这里插入图片描述
超过数组的长度,进行取模操作

在这里插入图片描述
还剩下最后一个问题,如何确保,生产者和消费在重合后最多差距为1圈。
因为开始的时候记录的空格子信号量计算为最大,记录插入数据的信号量为0。
因此,对于消费者申请资源会被挂起,这样生产者一定会先进行生产,假设生产者的生产速度大于消费者的处理数据的速度,当生产者把循环队列插满后,录的空格子信号量计算为0,当生产者进行下次生产,申请空的资源也会被挂起,直到,记录的空格子信号量不为0为止,这样就保证了生产者和消费在重合后最多差距为1圈。
下面是根据这种思想,写出的单生产者消费者模型。


#include <iostream>
#include <vector>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
using namespace std;

#define NUM 8
class RingQueue
{
    private:
		vector<int> q;
        int cap;//容量
		sem_t data_sem;
		sem_t space_sem;
		int consume_step;
		int product_step;

    public:
		RingQueue(int _cap =NUM):cap(_cap),q(_cap)
    {
			sem_init(&data_sem, 0, 0);
			sem_init(&space_sem, 0, cap);
			consume_step = 0;
			product_step = 0;
    }
     ~RingQueue()
     {
		 sem_destroy(&data_sem);
		 sem_destroy(&space_sem);
    }
    
    void PutData(const int& data)
    {
		sem_wait(&space_sem);
		q[consume_step] = data;
		consume_step++;
		consume_step %= cap;
		sem_post(&data_sem);
    }

    void GetData(int & data)
    {
		sem_wait(&data_sem);
		data = q[product_step] ;
		product_step++;
		product_step %= cap;
		sem_post(&space_sem);
    
    }

};


void* producter(void* arg)
{
 int data;
 srand((unsigned long)time(NULL));
 for(;;)
 {  
	 RingQueue *bq = (RingQueue*) arg;
 data = rand()%1024;
     bq->PutData(data);
    cout<<"the data is push done :"<<data<<endl;
 //   sleep(1);生产者去掉限制
 }


}

void* consumer(void *arg)
{
	RingQueue *bq = (RingQueue*) arg;
 int data;
 for(;;)
 {  bq->GetData(data);
    cout<<"the data is pop done :"<<data<<endl;
    sleep(1);
 }

}



int main()
{
RingQueue bd;
pthread_t c,p;
pthread_create(&c,NULL,consumer,(void*)&bd);//线程为可分离的
pthread_create(&p,NULL,producter,(void*)&bd);

pthread_join(c,NULL);
pthread_join(p,NULL);

}


上面我们使用了环形缓冲区来代替普通缓冲区,环形缓冲区队列相对于普通缓冲区有哪些优点?
环形缓冲区所有的push和pop操作都是在一个固定的存储空间内进行。而队列缓冲区在push的时候,可能会分配存储空间用于存储新元素;在pop时,可能会释放废弃元素的存储空间。所以环形方式相比队列方式,少掉了对于缓冲区元素所用存储空间的分配、释放。这是环形缓冲区的一个主要优势。

总结

  • 基于信号量的生产者消费者模型利用信号对生产者消费者进行制约。
  • 环形缓冲区相对于队列方式减少了对缓冲元素所用的储存空间的分配,释放,这是环形缓冲区的主要优势。

思想和基于阻塞队列的生产者消费者模型没有大的区别,读者应该理解生产者消费者模型的意义和解决问题的方法。

没有更多推荐了,返回首页