2010-10-09 00:19:36 frenchleaf 阅读数 45
[b]就是不改err_sys方法为printf,
我把代码放在/root/Desktop/unix/apue.2e
编辑源码解压生成的apue.2e文件夹下的Make.defines.linux
WKDIR=/home/var/apue.2e为/root/Desktop/unix/apue.2e
然后进入apue.2e/std 目录,编辑linux.mk。修改里面所有的nawk为awk
vim下
:%s/nawk/awk/
在/root/Desktop/unix/apue.2e下运行make[/b]
输出一堆...
结尾是
etenv1.c:4: error: ‘ARG_MAX’ undeclared here (not in a function)
make[2]: *** [getenv1.o] 错误 1
make[2]:正在离开目录 `/root/Desktop/unix/apue.2e/threadctl'
make[1]: *** [linux] 错误 1
make[1]:正在离开目录 `/root/Desktop/unix/apue.2e'
make: *** [all] 错误 2
基本没啥大错误
编辑在目录下编辑fig1.c为
#include "apue.h"
#define BUFFSIZE 4096
int main(void)
{
int n;
char buf[BUFFSIZE];
while((n=read(STDIN_FILENO,buf,BUFFSIZE))>0)
if(write(STDOUT_FILENO,buf,n)!=n)
err_sys("write error");
if(n<0)
err_sys("read error");
exit(0);
}

然后运行
gcc fig1.c -I ./include/ -L ./lib -lapue
会生成a.out
./a.out 写啥就打印啥了
说明:
-o就不说了,不写就生成a.out
-I 指定apue.h所在的文件夹
-L 类库的目录
-l 找静态库,比如libapue.a的名称为apue
编译完成后可以用
ldd a.out看看库,是叫库吧,

[b]ctags:[/b]
apt-get install ctags
在source目录下
ctags -R生成tags文件
在~/.vimrc文件中定义
set tags=生成的tags文件全路径
vim fig1.c后用
光标处Ctrl-]键:跳到光标所在单词的tag。Ctrl-T键:跳回原来的位置。g]键(先按g再按]):如果有同名的多个tag,可以用这两个键进行跳转,会提示选择序号


[b]如果是《unix网络编程-卷2-进程间通信》[/b]
[b]比如下载的源码在/root/Desktop/unpv22e/
cd /root/Desktop/unpv22e
./configure
make
vi config.h
注释掉56,57,58行
cd lib
make就成功[/b]了
cd ../pipe # build and test a simple program
[code="java"]# make pipeconf
gcc -g -O2 -D_REENTRANT -Wall -c -o pipeconf.o pipeconf.c
gcc -g -O2 -D_REENTRANT -Wall -o pipeconf pipeconf.o ../libunpipc.a -lrt -lpthread
../libunpipc.a(wrapunix.o): In function `Mktemp':
/root/haoning/unpv22e/lib/wrapunix.c:184: warning: the use of `mktemp' is dangerous, better use `mkstemp'
# ./pipeconf /tmp/
PIPE_BUF = 4096, OPEN_MAX = 1024[/code]
参考http://tieba.baidu.com/f?kz=327192705

在第三章 System V IPC里(在红帽5里测试)
svmsg里的类编译不过
[code="java"]# make
gcc -g -O2 -D_REENTRANT -Wall -c -o ctl.o ctl.c
ctl.c: In function ‘main’:
ctl.c:8: 错误:‘buf’ 的存储大小未知
ctl.c:10: 错误:‘MSG_R’ 未声明 (在此函数内第一次使用)
ctl.c:10: 错误:(即使在一个函数内多次出现,每个未声明的标识符在其
ctl.c:10: 错误:所在的函数内只报告一次。)
ctl.c:10: 错误:‘MSG_W’ 未声明 (在此函数内第一次使用)
ctl.c:18: 错误:‘ulong_t’ 未声明 (在此函数内第一次使用)
ctl.c:18: 错误:expected ‘)’ before ‘info’
ctl.c:19: 警告:格式字符串实参太少
ctl.c:8: 警告:未使用的变量 ‘buf’
make: *** [ctl.o] 错误 1[/code]
改三部分:
1.unpv22e里面的Make.defines
修改
#CFLAGS = -g -O2 -D_REENTRANT -Wall
CFLAGS = -g -O2 -D_GNU_SOURCE -D__USE_GNU -D_REENTRANT -Wall
2.代码里面的,比如ctl.c里面的Msgget方法需要修改
//msqid = Msgget(IPC_PRIVATE, SVMSG_MODE | IPC_CREAT);
msqid = Msgget(IPC_PRIVATE, IPC_CREAT|0660);
3. 把所有的ulong_t改成ulong
修改后ctl.c就没错误了,其他错误还有一堆,挨个改吧,诶
把svmsg/ctl.c改成:
#include        "unpipc.h"

int
main(int argc, char **argv)
{
int msqid;
struct msqid_ds info;
struct msgbuf buf;

// msqid = Msgget(IPC_PRIVATE, SVMSG_MODE | IPC_CREAT);
msqid = Msgget(IPC_PRIVATE, IPC_CREAT|0660);
buf.mtype = 1;
buf.mtext[0] = 1;
Msgsnd(msqid, &buf, 1, 0);

Msgctl(msqid, IPC_STAT, &info);
printf("read-write: %03o, cbytes = %lu, qnum = %lu, qbytes = %lu\n", info.msg_perm.mode & 0777, (ulong) info.msg_cbytes, (ul
ong) info.msg_qnum, (ulong) info.msg_qbytes);//这里的ulong_t全改成ulong了

system("ipcs -q");

Msgctl(msqid, IPC_RMID, NULL);
exit(0);
}


[code="java"]# make
gcc -g -O2 -D_GNU_SOURCE -D__USE_GNU -D_REENTRANT -Wall -c -o ctl.o ctl.c
gcc -g -O2 -D_GNU_SOURCE -D__USE_GNU -D_REENTRANT -Wall -o ctl ctl.o ../libunpipc.a -lrt -lpthread
../libunpipc.a(wrapunix.o): In function `Mktemp':
/root/haoning/unpv22e/lib/wrapunix.c:184: warning: the use of `mktemp' is dangerous, better use `mkstemp'
gcc -g -O2 -D_GNU_SOURCE -D__USE_GNU -D_REENTRANT -Wall -c -o limits.o limits.c
limits.c: In function ‘main’:
limits.c:19: 错误:‘MSG_R’ 未声明 (在此函数内第一次使用)
limits.c:19: 错误:(即使在一个函数内多次出现,每个未声明的标识符在其
limits.c:19: 错误:所在的函数内只报告一次。)
limits.c:19: 错误:‘MSG_W’ 未声明 (在此函数内第一次使用)
make: *** [limits.o] 错误 1[/code]
继续改limits.c
#include	"unpipc.h"

#define MAX_DATA 64*1024
#define MAX_NMESG 4096
#define MAX_NIDS 4096
int max_mesg;

struct mymesg {
long type;
char data[MAX_DATA];
} mesg;

int
main(int argc, char **argv)
{
int i, j, msqid, qid[MAX_NIDS];

/* 4first try and determine maximum amount of data we can send */
// msqid = Msgget(IPC_PRIVATE, SVMSG_MODE | IPC_CREAT);
msqid = Msgget(IPC_PRIVATE, IPC_CREAT|0660);
mesg.type = 1;
for (i = MAX_DATA; i > 0; i -= 128) {
if (msgsnd(msqid, &mesg, i, 0) == 0) {
printf("maximum amount of data per message = %d\n", i);
max_mesg = i;
break;
}
if (errno != EINVAL)
err_sys("msgsnd error for length %d", i);
}
if (i == 0)
err_quit("i == 0");
Msgctl(msqid, IPC_RMID, NULL);

/* 4see how many messages of varying size can be put onto a queue */
mesg.type = 1;
for (i = 8; i <= max_mesg; i *= 2) {
// msqid = Msgget(IPC_PRIVATE, SVMSG_MODE | IPC_CREAT);
msqid = Msgget(IPC_PRIVATE, IPC_CREAT|0660);
for (j = 0; j < MAX_NMESG; j++) {
if (msgsnd(msqid, &mesg, i, IPC_NOWAIT) != 0) {
if (errno == EAGAIN)
break;
err_sys("msgsnd error, i = %d, j = %d", i, j);
break;
}
}
printf("%d %d-byte messages were placed onto queue,", j, i);
printf(" %d bytes total\n", i*j);
Msgctl(msqid, IPC_RMID, NULL);
}

/* 4see how many identifiers we can "open" */
mesg.type = 1;
for (i = 0; i <= MAX_NIDS; i++) {
//if ( (qid[i] = msgget(IPC_PRIVATE, SVMSG_MODE | IPC_CREAT)) == -1) {

if ( (qid[i] = msgget(IPC_PRIVATE, IPC_CREAT|0660)) == -1) {
printf("%d identifiers open at once\n", i);
break;
}
}
for (j = 0; j < i; j++)
Msgctl(qid[j], IPC_RMID, NULL);

exit(0);
}

slot.c---->System V IPC .P26 第三章
#include        "unpipc.h"
int
main(int argc, char **argv)
{
int i, msqid;

for (i = 0; i < 10; i++) {
// msqid = Msgget(IPC_PRIVATE, SVMSG_MODE | IPC_CREAT);
msqid = Msgget(IPC_PRIVATE, IPC_CREAT|0660);
printf("msqid = %d\n", msqid);

Msgctl(msqid, IPC_RMID, NULL);
}
exit(0);
}

unpv22e/shm/svmsgread.c中

[code="java"]# make
gcc -g -O2 -D_GNU_SOURCE -D__USE_GNU -D_REENTRANT -Wall -c -o svmsgread.o svmsgread.c
svmsgread.c: In function ‘main’:
svmsgread.c:27: 错误:‘MSG_R’ 未声明 (在此函数内第一次使用)
svmsgread.c:27: 错误:(即使在一个函数内多次出现,每个未声明的标识符在其
svmsgread.c:27: 错误:所在的函数内只报告一次。)
svmsgread.c:55: 警告:格式 ‘%d’ 需要类型 ‘int’,但实参 2 的类型为 ‘s[/code]

O_RDWR替换MSG_R或者MSG_W,网上说的 O_RDWD没找到
2019-06-02 22:19:28 psy6653 阅读数 20

 

 

#inlcude <semaphore.h>


int sem_init(sem_t *sem, int pshared, unsigned int value);
int sem_destroy(sem_t *sem);

信号量是随内存区的持续性而存在的。

当参数pshared 为0时,表示一个进程中各个线程共享的内存区,进程终止,信号量随进程的内存消失而消亡;

当参数pshared 为1(非零时),表示基于内存的信号量在不同进程间共享的;

应用在单个生产-单个消费者的实例,同有名信号量效果一样,最主要就是信号量初始化和结束的函数调用发生了变化,然后就是在shared结构体中,有名信号量用的指针,基于内存的信号量使用的是普通变量,两者效果一样,都是传的地址空间;

源码如下:

#include        "pxsem2.h"

#define NBUFF    10

int             nitems;                                 /* read-only by producer and consumer */
struct {        /* data shared by producer and consumer */
  int   buff[NBUFF];
  sem_t mutex, nempty, nstored;         /* semaphores, not pointers */
} shared;

void    *produce(void *), *consume(void *);

int
main(int argc, char **argv)
{
        pthread_t       tid_produce, tid_consume;

        if (argc != 2)
                err_quit("usage: prodcons2 <#items>");
        nitems = atoi(argv[1]);

                /* 4initialize three semaphores */
        Sem_init(&shared.mutex, 0, 1);
        Sem_init(&shared.nempty, 0, NBUFF);
        Sem_init(&shared.nstored, 0, 0);

        Set_concurrency(2);
        Pthread_create(&tid_produce, NULL, produce, NULL);
        Pthread_create(&tid_consume, NULL, consume, NULL);

        Pthread_join(tid_produce, NULL);
        Pthread_join(tid_consume, NULL);

        Sem_destroy(&shared.mutex);
        Sem_destroy(&shared.nempty);
        Sem_destroy(&shared.nstored);
        exit(0);
}
void *
produce(void *arg)
{
        int             i;

        for (i = 0; i < nitems; i++) {
                Sem_wait(&shared.nempty);       /* wait for at least 1 empty slot */
                Sem_wait(&shared.mutex);
                shared.buff[i % NBUFF] = i;     /* store i into circular buffer */
                printf("producter buff[%d] = %d\n", (i%NBUFF), shared.buff[i % NBUFF]);
                Sem_post(&shared.mutex);
                Sem_post(&shared.nstored);      /* 1 more stored item */
        }
        return(NULL);
}

void *
consume(void *arg)
{
        int             i;

        for (i = 0; i < nitems; i++) {
                Sem_wait(&shared.nstored);              /* wait for at least 1 stored item */
                Sem_wait(&shared.mutex);
                if (shared.buff[i % NBUFF] != i)
                        printf("buff[%d] = %d\n", i, shared.buff[i % NBUFF]);
                printf("comsumer buff[%d] = %d\n", (i%NBUFF), shared.buff[i % NBUFF]);
                Sem_post(&shared.mutex);
                Sem_post(&shared.nempty);               /* 1 more empty slot */
        }
        return(NULL);
}
                                           

执行效果如下:

第二部分,多个生产者,单个消费者问题;

 

源码

/* include main */
#include        "pxsem3.h"

#define NBUFF            10
#define MAXNTHREADS     100

int             nitems, nproducers;             /* read-only by producer and consumer */

struct {        /* data shared by producers and consumer */
  int   buff[NBUFF];
  int   nput;
  int   nputval;
  sem_t mutex, nempty, nstored;         /* semaphores, not pointers */
} shared;

void    *produce(void *), *consume(void *);

int
main(int argc, char **argv)
{
        int             i, count[MAXNTHREADS];
        pthread_t       tid_produce[MAXNTHREADS], tid_consume;

        if (argc != 3)
                err_quit("usage: prodcons3 <#items> <#producers>");
        nitems = atoi(argv[1]);
        nproducers = min(atoi(argv[2]), MAXNTHREADS);

                /* 4initialize three semaphores */
        Sem_init(&shared.mutex, 0, 1);
        Sem_init(&shared.nempty, 0, NBUFF);
        Sem_init(&shared.nstored, 0, 0);

                /* 4create all producers and one consumer */
        Set_concurrency(nproducers + 1);
        for (i = 0; i < nproducers; i++) {
                count[i] = 0;
                Pthread_create(&tid_produce[i], NULL, produce, &count[i]);
        }
        Pthread_create(&tid_consume, NULL, consume, NULL);

                /* 4wait for all producers and the consumer */
        for (i = 0; i < nproducers; i++) {
                 Pthread_join(tid_produce[i], NULL);
                printf("count[%d] = %d\n", i, count[i]);
        }
        Pthread_join(tid_consume, NULL);

        Sem_destroy(&shared.mutex);
        Sem_destroy(&shared.nempty);
        Sem_destroy(&shared.nstored);
        exit(0);
}
/* end main */

/* include produce */
void *
produce(void *arg)
{
        for ( ; ; ) {
                Sem_wait(&shared.nempty);       /* wait for at least 1 empty slot */
                Sem_wait(&shared.mutex);

                if (shared.nput >= nitems) {
                        Sem_post(&shared.nempty);
                        Sem_post(&shared.mutex);
                        return(NULL);                   /* all done */
                }

                shared.buff[shared.nput % NBUFF] = shared.nputval;
                shared.nput++;
                shared.nputval++;

                Sem_post(&shared.mutex);
                Sem_post(&shared.nstored);      /* 1 more stored item */
                *((int *) arg) += 1;
        }
}
/* end produce */

/* include consume */
void *
consume(void *arg)
{
        int             i;

        for (i = 0; i < nitems; i++) {
                Sem_wait(&shared.nstored);              /* wait for at least 1 stored item */
                Sem_wait(&shared.mutex);

                if (shared.buff[i % NBUFF] != i)
                        printf("error: buff[%d] = %d\n", i, shared.buff[i % NBUFF]);

                Sem_post(&shared.mutex);
                Sem_post(&shared.nempty);               /* 1 more empty slot */
        }
        return(NULL);
}
/* end consume */

线程相关的调用,在互斥锁部分已经介绍的很详细了,与以上单个生产——单个消费者实例相比,最主要的变化就是生产者的执行函数多了以下调用;

if (shared.nput >= nitems) {
        Sem_post(&shared.nempty);
        Sem_post(&shared.mutex);
        return(NULL);                   /* all done */
                }

因为当生产计划完成了,但是多个生产者线程都还没停止,但总有第一个生产者线程先获取最后一个shared.nempty资源,获取后shared.nempty的值就会减1变为0,那么这个线程是能正常结束,但是其他的生产者线程永远阻塞等待shared.nempty;所以有了以上的函数调用,能帮助其他线程都正常执行结束;

 

 

2019-05-24 21:55:27 psy6653 阅读数 30

本章介绍有名信号相关函数的使用,在参考书的第10章。本文主要分为两部分,第一部分为有名信号量相关函数的介绍,第二部分为在生产-消费(生产、消费各一个线程)的案例中使用有名信号量进行同步操作;演示的程序源代码都是从G-F所提供免费源码工程中提取出来的,这样每个函数的执行操作就会更加清楚;

第一部分:

1、有名信号量的相关函数

#include <semaphore.h>

sem_t *sem_open(const char *name, int oflag,.../*mode_t mode, unsigned int value */);

打开或者创建一个有名信号量,oflag的的参数设置同open()函数相类似

#include <semaphore.h>

int sem_close(sem_t *sem);

关闭信号量,但并不删除信号量。但信号量是随内核持续的,下面的demo演示就可看到进程执行结束,信号量依然随内核保持

#include <semaphore.h>

int sem_unlink(sem_t *sem);

从文件系统删除有名消耗量的名字

#include <semaphore.h>
int sem_wait(sem_t *sem); //信号量V操作,减1
int sem_post(sem_t *sem); //信号量P操作,加1
int sem_getvalue(sem_t *sem,int *valp); //返回 信号量sem的当前值

2、源码分析

semcreate源码:

include "pxsem.h"

int main(int argc, char **argv)
{
        int             c, flags;
        sem_t   *sem;
        unsigned int    value;

        flags = O_RDWR | O_CREAT;
        value = 1;
        while ( (c = Getopt(argc, argv, "ei:")) != -1) {
                switch (c) {
                case 'e':
                        flags |= O_EXCL;
                        break;

                case 'i':
                        value = atoi(optarg);
                        break;
                }
        }
        if (optind != argc - 1)
                err_quit("usage: semcreate [ -e ] [ -i initalvalue ] <name>");

        sem = Sem_open(argv[optind], flags, FILE_MODE, value);

        Sem_close(sem);
        exit(0);
}

semunlink源码:

#include        "pxsem.h"

int
main(int argc, char **argv)
{
        if (argc != 2)
                err_quit("usage: semunlink <name>");

        Sem_unlink(argv[1]);

        exit(0);
}

semgetvalue源码:

#include        "pxsem.h"

int
main(int argc, char **argv)
{
        sem_t   *sem;
        int             val;

        if (argc != 2)
                err_quit("usage: semgetvalue <name>");

        sem = Sem_open(argv[1], 0);
        Sem_getvalue(sem, &val);
        printf("value = %d\n", val);

        exit(0);
}

运行结果如下,

在编译完成之后,当创建管道时,发现有sem_open error for /tmp/test: No such file or directory的提示错误,解决的方案是需要在

/dev/shm/目录下创建一个sem.tmp目录;详情可以看https://www.redhat.com/archives/phil-list/2003-January/msg00113.html

semcreate 创建一个有名信号量,随着semcreate进程的结束,可以看到信号量的值依然为1,说明fedora linux系统的有名信号量是随内核保持的。semwait程序把信号量进行减1操作;应用程序每次对信号量进行了关闭操作,但信号量还是存在,实际没有被删除。semunlink程序把信号量从系统进行删除。

sempost源码:

#include        "pxsem.h"

int
main(int argc, char **argv)
{
        sem_t   *sem;
        int             val;

        if (argc != 2)
                err_quit("usage: sempost <name>");

        sem = Sem_open(argv[1], 0);
        Sem_post(sem);
        Sem_getvalue(sem, &val);
        printf("value = %d\n", val);

        exit(0);
}

先挂起的进程先被唤醒执行; fedora Linux由于系统缘故,演示不了信号量为负数的情况;

第二部分:

1、有名信号量在单个生产-单个消费者问题的应用,

/* include main */
#include        "pxsem1.h"

#define NBUFF    10 //缓冲区大小
#define SEM_MUTEX       "mutex"    //同一时刻只能一个线程(生产或者消费)访问数据缓冲区     /* these are args to px_ipc_name() */
#define SEM_NEMPTY      "nempty"   //(缓冲区能够提供生产的空位数量)
#define SEM_NSTORED     "nstored"  //(缓冲区已经生产好的数据个数)

int             nitems;                                 /* read-only by producer and consumer */
struct {        /* data shared by producer and consumer */
  int   buff[NBUFF];
  sem_t *mutex, *nempty, *nstored;
} shared;

void    *produce(void *), *consume(void *);

int
main(int argc, char **argv)
{
        pthread_t       tid_produce, tid_consume;

        if (argc != 2)
                err_quit("usage: prodcons1 <#items>");
        nitems = atoi(argv[1]);

                /* 4create three semaphores */
        shared.mutex = Sem_open(Px_ipc_name(SEM_MUTEX), O_CREAT | O_EXCL,
                                                        FILE_MODE, 1);//信号量mutex初始值为1
        shared.nempty = Sem_open(Px_ipc_name(SEM_NEMPTY), O_CREAT | O_EXCL,
                                                         FILE_MODE, NBUFF);//信号量mutex初始值为10
        shared.nstored = Sem_open(Px_ipc_name(SEM_NSTORED), O_CREAT | O_EXCL,
                                                          FILE_MODE, 0);//信号量mutex初始值为0

        /* 4create one producer thread and one consumer thread */
        Set_concurrency(2);//第7章已经介绍过相关线程函数的作用
        Pthread_create(&tid_produce, NULL, produce, NULL);
        Pthread_create(&tid_consume, NULL, consume, NULL);

        /* 4wait for the two threads */
        Pthread_join(tid_produce, NULL);
        Pthread_join(tid_consume, NULL);

        /* 4remove the semaphores */
        Sem_unlink(Px_ipc_name(SEM_MUTEX));//从系统中删除有名信号量
        Sem_unlink(Px_ipc_name(SEM_NEMPTY));
        Sem_unlink(Px_ipc_name(SEM_NSTORED));
        exit(0);
}
/* end main */

/* include prodcons */
void *
produce(void *arg)
{
        int             i;

        for (i = 0; i < nitems; i++) {
                Sem_wait(shared.nempty);//缓冲区的空位大于0,那么就把nempty减1然后继续执行(初始值为10)        /* wait for at least 1 empty slot */
                Sem_wait(shared.mutex);
                shared.buff[i % NBUFF] = i;     /* store i into circular buffer */
                printf("producter buff[%d] = %d\n", (i%NBUFF), shared.buff[i % NBUFF]);
                Sem_post(shared.mutex);
                Sem_post(shared.nstored);//生产完成,缓冲区已经生产的数据nstored累加1       /* 1 more stored item */
        }
        return(NULL);
}

void *
consume(void *arg)
{
        int             i;
        //逻辑与生产者线程相反
        for (i = 0; i < nitems; i++) {
                Sem_wait(shared.mutex);
                Sem_wait(shared.nstored);//缓冲区已生产的数据大于零,那么就把nstored减1然后继续执行               /* wait for at least 1 stored item */
                if (shared.buff[i % NBUFF] != i)
                        printf("buff[%d] = %d\n", i, shared.buff[i % NBUFF]);
                        printf("consumer buff[%d] = %d\n", (i%NBUFF), shared.buff[i % NBUFF]);
                Sem_post(shared.mutex);
                Sem_post(shared.nempty); //缓冲区取出一个数据,那么就把nempty累加1               /* 1 more empty slot */
        }
        return(NULL);
}
/* end prodcons */
                                                                                                                                                                                               

以下为生产50个数据程序执行的情况,

如果调换消费者线程中的以下两条语句的执行顺序,

Sem_wait(shared.mutex);
Sem_wait(shared.nstored);  

在执行程序的过程中发生了死锁:

执行顺序的不恰当,会让程序死锁。

2019-05-22 19:52:45 psy6653 阅读数 33

一、生产者-消费者问题(非涉及同步),主要介绍线程函数的作用;

全局变量定义

int nitems; /* read-only by producer and consumer */
struct {
  pthread_mutex_t mutex;
  int buff[MAXNITEMS];
  int nput;
  int nval;
}shared = {PTHREAD_MUTEX_INITIALIZER};

void *produce(void *);
void *consume(void *);

main函数如下

int main(int argc, char **argv){

        /* 最多可容纳MAXNTHREADS个线程(实际没用到100个),count[i]数组用来存放第i个线程所执行的次数 */
        int i, nthreads, count[MAXNTHREADS];

        /* tid_produce[i]用来存放第i个线程的的标识 */
        pthread_t tid_produce[MAXNTHREADS], tid_consume;
       
        /* items表示生产的总条目 threads表示将有多少个线程来生产 */
        if (argc != 3)
                err_quit("usage: prodcons2 <#items> <#threads>");

        nitems = min(atoi(argv[1]), MAXNITEMS);/* 命令行参数1(生产的总条目)与10000作比较,取最小值 */
        nthreads = min(atoi(argv[2]), MAXNTHREADS);/* 命令行参数2(线程数)与100作比较,取最小值 */
    
        /* Linux标准调用pthread_setconcurrency()函数 */
        Set_concurrency(nthreads);
/* ------------------------------------------------开始生产------------------------------------------------------------------ */
        /* 创建生产线程,&tid_produce[i]为线程标识地址,tid_produce[i]为线程标识的值,&count[i]为传入地址,count[i] 统计第i个线程所执行的次数(值) */
        for (i = 0; i < nthreads; i++) {
                count[i] = 0;
                Pthread_create(&tid_produce[i], NULL, produce, &count[i]);
        }

        /*pthread_join()作用是等tid_produce[i]的线程执行完,即阻塞运行,如果线程不执行完,我就不继续往下执行*/
        for (i = 0; i < nthreads; i++) {
                Pthread_join(tid_produce[i], NULL);
                printf("count[%d] = %d\n", i, count[i]);
        }
/* ------------------------------------------------生产完成-------------------------------------------------------------------- */
        /* 创建一个消费线程*/
        Pthread_create(&tid_consume, NULL, consume, NULL);
        Pthread_join(tid_consume, NULL);

        exit(0);
}

其中Set_concurrency()的函数,调用的函数原型是pthread_setconcurrency(),其作用是让系统知道并发运行的线程数量,这样每个生产的线程都会执行到;资料介绍如果在某些系统(Slaris)下省略该调用,会导致只有一个生产线程 执行;

生产者函数如下

static int num = 1;
/*只打印一次,查看shared.nput与shared.nval的初值,通过程序执行为0
void *produce(void *arg){
        for ( ; ; ) {
                 /* 加锁 */
                Pthread_mutex_lock(&shared.mutex);
                /* nitems 为要生产的总条目,为命令行参数1,演示输入的1000000*/
                if (shared.nput >= nitems) {
                        /* 解锁 */
                        Pthread_mutex_unlock(&shared.mutex);
                        return(NULL);
                }
                if(num > 0){
                        printf("shared.nput: %d--,shared.nval:
 %d--\n",shared.nput,shared.nval);
                        num--;
                }
                shared.buff[shared.nput] = shared.nval;//对应buff[i]存放的值为i
                shared.nput++;
                shared.nval++;
                /* 解锁 */
                Pthread_mutex_unlock(&shared.mutex);
                *((int *) arg) += 1; 传入参数count[i]的值累加,i表示第i个线程,分配的空间在main函数中
        }
}

消费者函数如下:

void *consume(void *arg){
        int i;
        /* 对生产线程所生产的 nitems 条目进行检查*/
        for (i = 0; i < nitems; i++) {
                if (shared.buff[i] != i)
                        printf("buff[%d] = %d\n", i, shared.buff[i]);
        }
        return(NULL);
}

程序演示结果

以上为先用多个线程生产(多个线程访问共享数据时加锁),后用一个线程进行消费,没有涉及到同步问题,后续的同步问题都时在该程序基础上进行,所以有必要了解基本的源代码;

二、与一相比,变化的有两处,main函数和消费者函数

源码分析,与上面的程序相比,函数pthread_setconcurrency()调用的参数增加了1个,即并行执行的线程为生产和消费线程总数,消费线程执行的顺序改变了,基本上与生产线程一并启动;

int main(int argc, char **argv){

        int             i, nthreads, count[MAXNTHREADS];
        pthread_t       tid_produce[MAXNTHREADS], tid_consume;

        if (argc != 3)
                err_quit("usage: prodcons3 <#items> <#threads>");
        nitems = min(atoi(argv[1]), MAXNITEMS);
        nthreads = min(atoi(argv[2]), MAXNTHREADS);

        Set_concurrency(nthreads + 1);
        for (i = 0; i < nthreads; i++) {
                count[i] = 0;
                Pthread_create(&tid_produce[i], NULL, produce, &count[i]);
        }
    /* 与上面的相比,消费线程启动提前(基本与生产线程一起执行)*/
        Pthread_create(&tid_consume, NULL, consume, NULL);

        for (i = 0; i < nthreads; i++) {
                Pthread_join(tid_produce[i], NULL);
                printf("count[%d] = %d\n", i, count[i]);
        }
        Pthread_join(tid_consume, NULL);

        exit(0);
}

生产者的线程执行函数保持不变,改变的是消费者线程;

消费者线程执行如下

void consume_wait(int i){
        /*等待第i个条目是否生产完成,如果生产完成了就return;如果生产没完成,就while(1)执行[加锁、解锁]动作 (等待生产完成),该方法俗称轮转(spinning)或者轮询(polling)*/
        for ( ; ; ) {
                Pthread_mutex_lock(&shared.mutex);
                if (i < shared.nput) {
                        Pthread_mutex_unlock(&shared.mutex);
                        return;                 
                }
                Pthread_mutex_unlock(&shared.mutex);
        }
}

void * consume(void *arg){
        int     i;

        for (i = 0; i < nitems; i++) {
                consume_wait(i);/*等待第i个条目是否生产完成*/
                if (shared.buff[i] != i)
                        printf("buff[%d] = %d\n", i, shared.buff[i]);
        }
        return(NULL);
}

资料介绍这种轮询消费的方式对cpu是一种浪费;

三、与二相比,变化的有三处,第一是全局变量的定义;第二是生产者的函数;第三是消费者函数;另外main函数与二完全相同,相同的地方就不介绍了;

全局变量的定义

int     nitems;
int     buff[MAXNITEMS];
struct {
  pthread_mutex_t       mutex;
  int                   nput;   /* next index to store */
  int                   nval;   /* next value to store */
} put = { PTHREAD_MUTEX_INITIALIZER };/* 同一、二一样 */

struct {
  pthread_mutex_t       mutex;
  pthread_cond_t        cond;
  int                   nready; /* number ready for consumer */
} nready = { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER };
/* 初始化锁mutex为PTHREAD_MUTEX_INITIALIZER,条件变量cond为PTHREAD_COND_INITIALIZER  */

生产者函数

void* produce(void *arg)
{
        for ( ; ; ) {
                Pthread_mutex_lock(&put.mutex);
                if (put.nput >= nitems) {
                        Pthread_mutex_unlock(&put.mutex);
                        return(NULL);           /* array is full, we're done */
                }
                buff[put.nput] = put.nval;
                put.nput++;
                put.nval++;
                Pthread_mutex_unlock(&put.mutex);

                Pthread_mutex_lock(&nready.mutex);
                if (nready.nready == 0)
                        Pthread_cond_signal(&nready.cond);
                nready.nready++;
                Pthread_mutex_unlock(&nready.mutex);

                *((int *) arg) += 1;
        }
}

生产者多了个pthread_cond_signal()函数,作用是唤醒等待正在等待nready.nready 其值变为非零的线程(消费者),需要与关联的互斥锁(nready.mutex)与条件变量(nready.cond)相互协作;

消费者函数

void *
consume(void *arg)
{
        int             i;

        for (i = 0; i < nitems; i++) {
                Pthread_mutex_lock(&nready.mutex);
                while (nready.nready == 0){
                     
                        Pthread_cond_wait(&nready.cond, &nready.mutex);
                        
                }
                nready.nready--;
               
                Pthread_mutex_unlock(&nready.mutex);

                if (buff[i] != i)
                        printf("buff[%d] = %d\n", i, buff[i]);
        }
        return(NULL);
}

消费者多了个pthread_cond_wait()函数,当nready.nready 为零,资料介绍其作用①给nready.mutex解锁;②把消费者消线程投入睡眠,直到另外线程就条件变量nready.cond调用的pthread_cond_signal()函数来唤醒;

以下为演示结果,为了更加直观,我在程序添加了打印信息:

static int num = 50;

生产者线程增加三处打印

在消费者增加四处打印信息

测试结果如下

 

前面的图是5个线程生产100个条目,后面图是是5个线程生产100w条目(注意关闭消费线程的第一条打印),发现根本没有执行thread_cond_wait()函数;如何让thread_cond_wait()执行呢?

更改主函数中线程启动的顺序,如下

如果只更改顺序还不行(尝试过),最好来点睡眠,调用thread_cond_wait()先让消费者线程睡眠。

测试结果如下:

打印-----4--时,消费者线程已经进入睡眠,可以看到两次结果还不一样,我们只要看前面一张,后面一张不用管(可能跟操作系统执行的算法有关。那么执行顺序到底是怎样呢?

重新修改下打印信息的顺序

void produce(void *arg)
{
        for ( ; ; ) {
                Pthread_mutex_lock(&put.mutex);
                if (put.nput >= nitems) {
                        Pthread_mutex_unlock(&put.mutex);
                        return(NULL);           /* array is full, we're done */
                }
                buff[put.nput] = put.nval;
                put.nput++;
                put.nval++;
                Pthread_mutex_unlock(&put.mutex);

                Pthread_mutex_lock(&nready.mutex);
                if (nready.nready == 0){
                        if(num > 0){
                                printf("-----1--\n");
                                num--;
                        }
                        Pthread_cond_signal(&nready.cond);
                        if(num > 0){
                                printf("-----2--\n");
                                num--;
                        }
                }
                nready.nready++;
                if(num > 0){
                        printf("-----3--\n");
                        num--;
                }
                Pthread_mutex_unlock(&nready.mutex);
                if(num > 0){
                        printf("-----4--\n");
                        num--;
                }

                *((int *) arg) += 1;
        }
}
void *consume(void *arg)
{
        int             i;

        for (i = 0; i < nitems; i++) {
                        //printf("buff[%d] = %d\n", i, buff[i]);
                Pthread_mutex_lock(&nready.mutex);
                while (nready.nready == 0){
                        if(num > 0){
                                printf("-----5--\n");
                                num--;
                        }
                        Pthread_cond_wait(&nready.cond, &nready.mutex);
                        if(num > 0){
                                printf("-----6--\n");
                                num--;
                        }
                }
                nready.nready--;
                if(num > 0){
                        printf("-----7--\n");
                        num--;
                }
                Pthread_mutex_unlock(&nready.mutex);
                if(num > 0){
                        printf("-----8--\n");
                        num--;
                }



                if (buff[i] != i)
                        printf("buff[%d] = %d\n", i, buff[i]);
        }
        return(NULL);
}

运行结果如下:

可以看到thread_cond_wait()先释放消费者线程的锁并让该线程睡眠,在thread_cond_wait()返回前,重新上锁,此时while(nready.nready == 0)为假,终止循环;

2019-11-06 15:00:39 Nick_Zhang_CSDN 阅读数 8
  1. SUSE Linux Enterprise Server 12 SP2 (x86_64) – kernal 4.4.21-69-default

    编译步骤:
    在这里插入图片描述
    make 报错


gcc -g -O2 -D_REENTRANT -Wall -D_POSIX_PTHREAD_SEMANTICS   -c -o daemon_inetd.o daemon_inetd.c
In file included from unpipc.h:7:0,
                 from daemon_inetd.c:1:
../config.h:56:17: error: duplicate ‘unsigned#define uint8_t unsigned char    /* <sys/types.h> */
                 ^
../config.h:56:26: error: two or more data types in declaration specifiers
 #define uint8_t unsigned char    /* <sys/types.h> */
                          ^
../config.h:57:18: error: duplicate ‘unsigned#define uint16_t unsigned short    /* <sys/types.h> */
                  ^
../config.h:57:27: error: duplicate ‘short#define uint16_t unsigned short    /* <sys/types.h> */
                           ^
../config.h:58:18: error: duplicate ‘unsigned#define uint32_t unsigned int    /* <sys/types.h> */
                  ^
../config.h:58:27: error: two or more data types in declaration specifiers
 #define uint32_t unsigned int    /* <sys/types.h> */
解决方法:
		打开unpipc.h , 注释掉 line 56 57 58.
没有更多推荐了,返回首页