精华内容
下载资源
问答
  • barrier and rwlock实现POSIX源码

    千次阅读 2013-02-16 17:48:35
    在《POSIX多线程程序设计》中,作者David R. Butenhof给我们展示了诸多实用pthread_mutex_t 和 pthread_cond_t构建的线程同步工具,我最喜欢的两个是barrier和rwlock。所以用C实现并在虚拟机上爽了一把。先贴出代码...

    前言

    在《POSIX多线程程序设计》中,作者David R. Butenhof给我们展示了诸多实用pthread_mutex_t 和 pthread_cond_t构建的线程同步工具,我最喜欢的两个是barrier和rwlock。所以用C实现并在虚拟机上爽了一把。先贴出代码以及注释,以供大家查阅,共同进步。

    下载源代码

    barrier 等待所有线程进入同一状态

    barrier用于停止线程,直到所有在barrier的线程都到达当前状态,才返回。barrier经常用于确保某些并行算法中所有合作的线程到达同一点。比如启动了N个线程对某大数组进行分段处理,在所有线程处理完之后,再对结果进行合并或展示,这个时候就可以使用barrier。其作用这时就相当于Win 下的WaitForMutipleObjects

    要使所有线程达到同一状态,然后阻塞,最简单的想法就是保存当前已到达指定状态的线程数,如果线程数量与指定的最大线程数不同,线程进入等待状态,相同,达到预设条件,唤醒其他线程。

    要使线程等待,在POSIX中,就理所当然应该使用pthread_cond_t作为等待的对象。同时要修改当前线程的计数,需要一个互斥变量,阻止线程同时读写这个计数。barrier的声明如下:

    /*
     * Structure describing a barrier.
     */
    typedef struct barrier_tag {
    	pthread_mutex_t	mutex;	/* Control access to barrier */
    	pthread_cond_t	cv;	/* Wait for barrier */
    	int		valid;	/* Set when valid */
    	int		threshold;	/* number of threads required */
    	int		counter;	/* current number of threads */
    	unsigned long	cycle;		/* count cycles */
    } barrier_t;
    要提供的操作barrier的api也只需要三个:init, wait, destroy。

    /*
     * Define barrier operations.
     */
    extern int barrier_init(barrier_t * barrier, int count); /* dynamic initialization of barriers */
    extern int barrier_destroy(barrier_t * barrier);         /* destroy the barrier */
    extern int barrier_wait(barrier_t * barrier);            /* wait until the barrier is actived */

    barrier_init

    此函数用于初始化barrier_t,分别初始化mutex, cond变量,设置等待线程和需要等待线程总数,设置谓语动词cycle,设置barrier有效标志

    /*
     * Initialize a barrier for use.
     */
    int barrier_init(barrier_t * barrier, int count)
    {
    	int status;
    
    	barrier->threshold = barrier->counter = count;
    	barrier->cycle = 0;
    	status = pthread_mutex_init(&barrier->mutex, NULL);
    	if (status != 0) {
    		return status;
    	}
    	status = pthread_cond_init(&barrier->cv, NULL);
    	if (status != 0) {
    		pthread_mutex_destroy(&barrier->mutex);
    		return status;
    	}
    	barrier->valid = BARRIER_VALID;
    
    	return 0;
    }


    barrier_destroy

    销毁barrier, 为了防止重复销毁barrier, 先对valid标志进行检查,然后置空valid标志,这样就可以阻止其他线程再试图等待该barrier。最后一一销毁mutex和cond。

    /*
     * Destroy a barrier when done use it.
     */
    int barrier_destroy(barrier_t * barrier)
    {
    	int status, status2;
    
    	if (barrier->valid != BARRIER_VALID) {
    		return EINVAL;
    	}
    
    	/* Set barrier invalid. */
    	status = pthread_mutex_lock(&barrier->mutex);
    	if (status != 0) {
    		return status;
    	}
    	if (barrier->counter != barrier->threshold) {
    		pthread_mutex_unlock(&barrier->mutex);
    		return EBUSY;
    	}
    	barrier->valid = 0;
    	status = pthread_mutex_unlock(&barrier->mutex);
    	if (status != 0) {
    		return status;
    	}
    	
    	status = pthread_mutex_destroy(&barrier->mutex);
    	status2 = pthread_cond_destroy(&barrier->cv);
    	return (status != 0 ? status : status2);
    }

    barrier_wait

    等待所有线程到达同一状态点。试图wait时,先检查该barrier是否有效。然后互斥修改barrier->count。如果count为0,唤醒所有其他线程。否则等待cv。

    /*
     * Wait all threads reached.
     */
    int barrier_wait(barrier_t * barrier)
    {
    	int status, cycle, cancel, tmp;
    	if (barrier->valid != BARRIER_VALID) {
                    return EINVAL;
            }
    
    	status = pthread_mutex_lock(&barrier->mutex);
    	if (status != 0) {
    		return status;
    	}
    
    	cycle = barrier->cycle;
    
    	/* If the last thread arrived, wake others */
    	if (--barrier->counter == 0) {
    		barrier->counter = barrier->threshold;
    		barrier->cycle ++;
    		status = pthread_cond_broadcast(&barrier->cv);
    
    		/* The last thread return -1 rather than 0, so that
    		 * it can be used to do some special serial code following
    		 * the barrier.
    		 */
    		if (status == 0) {
    			status = -1;
    		}
    	} else {
    		/* Wait with cancellation disabled, because barrier_wait
    		 * should not be a cancellation point.
    		 */
    		pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel);
    
    		while (cycle == barrier->cycle) {
    			status = pthread_cond_wait(&barrier->cv, &barrier->mutex);
    			if (status != 0) {
    				break;
    			}
    		}
    		pthread_setcancelstate(cancel, &tmp);
    	}
    	
    	pthread_mutex_unlock(&barrier->mutex);
    	return status;
    }

    开始读这段代码时,比较让我迷惑的是当前线程T1先调用pthread_mutex_lock获得mutex的拥有权,然后调用pthread_cond_wait(),最后才调用pthread_mutex_unlock。在该线程T1等待时,并没有释放mutex(pthread_mutex_unlock在方法的最后),当另外一个线程T2要获得mutex时,因为T1已经劫持了这把锁,T2理所当然要进入mutex的等待队列。如果是这样的话这段代码就不应该能够工作了。书中有这样两句话:等待条件变量总是返回锁住的互斥变量。还有,在条件变量上等待会导致以下原子操作:释放相关互斥量,等待其他线程发给该条件变量的信号或广播。也就是说当线程调用pthread_cond_wait时,会释放线程所拥有的mutex,当线程从pthead_cond_wait()返回时,会重新获得mutex的拥有权。如果这样解释起来,这段代码就可以正确运行了。(ps,如果有能力/时间,还是看原著的好,中国的直译式的翻译,是中国软件发展的软肋乎?!)


    完整代码以及测试用例,在这里


    rwlock 读写锁

    读写锁用于线程之间写异步,读同步操作(当然您也可以反过来,如果确实需要的话:P)。这里给出一个读优先的例子。

    实现读写锁,只需要,在写的时候,查看当前是否有读或写线程,如果没有,那么设置当前有一个写线程,执行写操作;如果有等待被唤醒。而在读的时候只需要查看有没有写线程,如果没有,直接去读,不管是否有无其他读线程;如果有,则等待被唤醒。

    要设计一个读写锁,需要一个mutex用于保护读写变量的安全,一个可读条件,一个可写条件。另外还需要当前读变量数,当前写变量数,当前等待的读变量数,当前等待的写变量数。

    读写锁的结构如下:

    typedef struct rwlock_tag{
    	pthread_mutex_t mutex;     /* Access locker    */
    	pthread_cond_t  read;      /* Wait for read    */
    	pthread_cond_t  write;     /* Wait for write   */
    	int             r_wait;    /* Waiting readers  */
    	int             w_wait;    /* Waiting writers  */
    	int             r_active;  /* Activing readers */
    	int             w_active;  /* Activing writers */
    	int             valid;     /* Set when valid   */
    }rwlock_t;

    其操作包括:

    extern int rwlock_init(rwlock_t * rwlock);  
    extern int rwlock_destroy(rwlock_t * rwlock);
    extern int rwlock_readlock(rwlock_t * rwlock);
    extern int rwlock_tryreadlock(rwlock_t * rwlock);
    extern int rwlock_writelock(rwlock_t * rwlock);
    extern int rwlock_trywritelock(rwlock_t * rwlock);
    extern int rwlock_readunlock(rwlock_t * rwlock);
    extern int rwlock_writeunlock(rwlock_t * rwlock);

    读写锁的init和destroy与barrier的类似,这里不再赘述。由于读写的加解锁操作类似,只描述一个即可。这里只讲解一下readlock与readunlock的实现。


    rwlock_readlock

    实现读的原理前面已经叙述过:检查当前有木有正在读的线程,如果木有,增加正在读线程的计数,返回成功。由于读线程可以被取消,为了保证其他的读写线程可以正确工作,不能破坏rwlock的内部变量,当线程被取消时需要进行清理工作。明白原理,直接看代码吧。

    static void  rwlock_readcleanup(void * arg) 
    {
    	rwlock_t * rwlock = (rwlock_t *)arg;
    	--rwlock->r_wait;
    	pthread_mutex_unlock(&rwlock->mutex);
    }
    
    
    int rwlock_readlock(rwlock_t * rwlock)
    {
    	int status;
    
    	if (rwlock->valid != RWLOCK_VALID) {
    		return EINVAL;
    	}
    
    	status = pthread_mutex_lock(&rwlock->mutex);
    	if (status != 0) {
    		return status;
    	}
    
    	if (rwlock->w_active > 0) {
    		rwlock->r_wait++;
    		/* As read lock allow thread be canceled,
    		 * set cleanup to release resource.
    		 */
    		pthread_cleanup_push(rwlock_readcleanup, (void *)rwlock);
    
    		while (rwlock->w_active > 0) { 
    			status = pthread_cond_wait(&rwlock->read,
    					 &rwlock->mutex);
    			if (status != 0) {
    				break;
    			}
    		}
    		pthread_cleanup_pop(0);
    		rwlock->r_wait--;
    	} 
    
    	if (status == 0) {
    		rwlock->r_active ++;
    	}
    
    	pthread_mutex_unlock(&rwlock->mutex);
    	return status;
    }

    rwlock_readunlock

    读解锁时,只需要减少当前活动读线程计数,如果有待写线程,叫醒写线程。

    int rwlock_readunlock(rwlock_t * rwlock)
    {
    	int status, status2;
    	if (rwlock->valid != RWLOCK_VALID) {
    		return EINVAL;
    	}
    
    	status = pthread_mutex_lock(&rwlock->mutex);
    	if (status != 0) {
    		return status;
    	}
    
    	if (--rwlock->r_active == 0) {
    		if (rwlock->w_wait > 0){
    			status = pthread_cond_signal(&rwlock->write);
    		}
    	}
    
    	status2 = pthread_mutex_unlock(&rwlock->mutex);
    	return status != 0 ? status : status2;
    }

    打完收工!


    后记

    最近有些迷茫,原因是年龄的增长和自我感觉个人能力的不足以及目标的遥不可及。特别是每当看到大牛写的代码时,各种羡慕嫉妒恨,时常感到一阵压抑,脑中会充满各种问题,自己什么时候才能变成这样?有什么捷径?将来怎么办?

    冰冻三尺,非一日之寒;为山九仞,岂一日之功。又:合抱之木,生于毫末;九层之台,起于累土;千里之行,始于足下。不积跬步,无以至千里。所以用心做好自己,从平时的点滴做起,每天进步一点点。关键时刻,然后抓住机遇,实现自己的目标。

    大家共勉之!


    展开全文
  • 使用Posix线程实现的coroutine 协程的关键在于栈的保存沿用,有很多其他版本的C实现的coroutine,如:setcontex, setjmp/longjmp。我认为线程拥有自己的数据栈,天然提供栈的沿用,再利用pthread_mutex_t, pthread_...
  • POSIX 兼容readlink -f实现 POSIX shell 脚本。 为什么? readlink和realpath命令未指定 POSIX,并且可能未安装某些环境。 readlink -f ( , , )有许多实现替代方案。 其中一些可能有效,但它有一些边缘情况...
  • 本文简要介绍了Linux实现POSIX定时器的内核代码。内核中对posix定时器的实现代码在kernelposix-timers.c/h中,本文使用的代码是2.6.29;关于用户空间如何使用POSIX定时器请查阅相关man文档。  Linux提供的POSIX...
  • 尝试使用POSIX线程(C语言)实现路由信息协议。 贡献 您可以随意提出更改建议。 克隆存储库 在本地创建一个新分支 合并从远程到本地分支的更改 工作并推动变革 用法 在终端中输入“ make”作为输出(已提供的...
  • 服务器端极简主义POSIX shell重新实现,其他所有内容都是相同的。 参见或而前者,后者(洋葱链接)直接进入。 与PrivateBin的显着差异: i18n尚无法使用-不重要 什么有效: 如何升级到当前的PrivateBin 克隆...
  • Tpool 是基于 POSIX pthread 的 C++ 线程池实现。 它的设计考虑到了简单性。 介绍 Tpool 非常易于使用。 假设你要编写一个web服务器,并在线程池中处理传入的请求,那么你可以编写如下代码来完成这个任务: # ...
  • 亚什3 这是Yet Another Shell(yash)的重新实现项目。 重新实现不太可能完成... 待办事项:详尽。
  • libglob 该存储库包含glob(3)的OpenBSD实现,以及非POSIX功能(例如GLOB_TILDE),在严格的POSIX C库(musl)中不可用。
  • shell-base64 如何使用: $ printf 'Man' | ./encode_base64.sh TWFu $ printf 'TWFu' | ./decode_base64.sh Man $ printf 'Man is distinguished, not only by his reason, but by this singular passion from ...
  • Glibc源码实现

    2018-07-13 17:37:35
    Glibc源代码实现源码分享,包含常见的stdlib,stdio,socket,posix,io,elf等等,分享代码换积分。。。
  • FTP-Client-Server 使用 POSIX API 用 C++ 实现 FTP 客户端和服务器。 实现的命令: 客户: 退出,被动,用户,通过,大小,cd,cdup,ls,get,put,mkdir,pwd,rm,rmdir。 服务器: pasv,端口,用户,通过,退出,cwd,列表,...
  • ocaml-posix-realtime-源码

    2021-05-07 23:54:42
    Posix_realtime当前实现POSIX实时扩展的时钟,计时器和消息传递。 POSIX实时扩展必须在您的操作系统上可用,并且可能在Linux,BSD,Solaris,QNX等的当前版本上可用。Mac OS X当前不支持许多POSIX实时功能,因此将...
  • 该软件包提供SYS V和POSIX消息队列,以在进程之间交换数据。 这两个队列具有相似的功能,但有所不同。 队列在内核中是持久的,除非关闭/取消链接队列或关闭系统。 与multiprocessing.Queue不同,同一队列可以根据其...
  • 某些POSIX API也可以在Windows上实现,并且可以提供所需的跨平台功能。 例如,内置process对象和核心fs模块可以。 可选模块和提供了其他POSIX API,以完成Node.js的提供。 但是,并非所有函数都可以在Windows上调用...
  • µShell是一种简单,轻便,干净,可扩展的,符合POSIX的UNIX Shell实现。 设置 make应该工作。 由于该项目尚处于起步阶段,因此尚无make install支持。 test外壳脚本运行一系列集成测试。 需要一个支持C ++ 11...
  • posix_fadvise源码分析

    2012-06-23 10:42:00
    posix_fadvise是linux上对文件进行预取的系统调用,其中第四个参数int advice为预取的方式,主要有以下几种: POSIX_FADV_NORMAL 无特别建议 重置预读大小为默认值 POSIX_FADV_SEQUENTIAL 将要进行顺序操作 设预读...

    posix_fadvise是linux上对文件进行预取的系统调用,其中第四个参数int advice为预取的方式,主要有以下几种:

    POSIX_FADV_NORMAL 无特别建议 重置预读大小为默认值
    POSIX_FADV_SEQUENTIAL 将要进行顺序操作 设预读大小为默认值的2 倍
    POSIX_FADV_RANDOM 将要进行随机操作 将预读大小清零(禁止预读)
    POSIX_FADV_NOREUSE 指定的数据将只访问一次 (暂无动作)
    POSIX_FADV_WILLNEED 指定的数据即将被访问 立即预读数据到page cache
    POSIX_FADV_DONTNEED 指定的数据近期不会被访问 立即从page cache 中丢弃数据

    其中,POSIX_FADV_NORMAL、POSIX_FADV_SEQUENTIAL、POSIX_FADV_RANDOM 是用来调整预取窗口的大小,POSIX_FADV_WILLNEED则可以将指定范围的磁盘文件读入到page cache中,POSIX_FADV_DONTNEED则将指定的磁盘文件中数据从page cache中换出。

    在kernel源码的mm/fadvise.c中,如下所示:

    SYSCALL_DEFINE(fadvise64_64)(int fd, loff_t offset, loff_t len, int advice)
    {
    struct file *file = fget(fd);
    struct address_space *mapping;
    struct backing_dev_info *bdi;
    loff_t endbyte; /* inclusive */
    pgoff_t start_index;
    pgoff_t end_index;
    unsigned long nrpages;
    int ret = 0;

    /* 判断由文件描述符获得的file指针是否有效 */
    if (!file)
    return -EBADF;

    /* 判断是否是管道 */
    if (S_ISFIFO(file->f_path.dentry->d_inode->i_mode)) {
    ret = -ESPIPE;
    goto out;
    }


    mapping = file->f_mapping;
    if (!mapping || len < 0) {
    ret = -EINVAL;
    goto out;
    }

    if (mapping->a_ops->get_xip_mem) {
    switch (advice) {
    case POSIX_FADV_NORMAL:
    case POSIX_FADV_RANDOM:
    case POSIX_FADV_SEQUENTIAL:
    case POSIX_FADV_WILLNEED:
    case POSIX_FADV_NOREUSE:
    case POSIX_FADV_DONTNEED:
    /* no bad return value, but ignore advice */
    break;
    default:
    ret = -EINVAL;
    }
    goto out;
    }

    /* Careful about overflows. Len == 0 means "as much as possible" */
    endbyte = offset + len;
    if (!len || endbyte < len)
    endbyte = -1;
    else
    endbyte--; /* inclusive */

    bdi = mapping->backing_dev_info;

    switch (advice) {
    case POSIX_FADV_NORMAL:

    /* 将预取窗口调整为设备默认的大小 */
    file->f_ra.ra_pages = bdi->ra_pages;
    spin_lock(&file->f_lock);
    file->f_mode &= ~FMODE_RANDOM;
    spin_unlock(&file->f_lock);
    break;
    case POSIX_FADV_RANDOM:

    /* 将文件的访问模式设为random即不做预取 */
    spin_lock(&file->f_lock);
    file->f_mode |= FMODE_RANDOM;
    spin_unlock(&file->f_lock);
    break;
    case POSIX_FADV_SEQUENTIAL:

    /* 将预取窗口设为默认值的2倍 */
    file->f_ra.ra_pages = bdi->ra_pages * 2;
    spin_lock(&file->f_lock);
    file->f_mode &= ~FMODE_RANDOM;
    spin_unlock(&file->f_lock);
    break;
    case POSIX_FADV_WILLNEED:

    /* 强制读取文件指定范围内的数据 */

    if (!mapping->a_ops->readpage) {
    ret = -EINVAL;
    break;
    }

    /* First and last PARTIAL page! */

    /* 获得起始页、结束页的编号 */
    start_index = offset >> PAGE_CACHE_SHIFT;
    end_index = endbyte >> PAGE_CACHE_SHIFT;

    /* Careful about overflow on the "+1" */

    /* 计算需要读取的页数,并+1向上取整 */

    nrpages = end_index - start_index + 1;
    if (!nrpages)
    nrpages = ~0UL;

    /* 强制读取指定范围的数据 */

    ret = force_page_cache_readahead(mapping, file,
    start_index,
    nrpages);
    if (ret > 0)
    ret = 0;
    break;
    case POSIX_FADV_NOREUSE:

    /* 没有实现,no use 没用 */

    break;
    case POSIX_FADV_DONTNEED:

    /* 将指定范围内的数据从page cache中换出 */

    if (!bdi_write_congested(mapping->backing_dev_info))
    filemap_flush(mapping);

    /* First and last FULL page! */

    /* 计算数据的起始页、结束页的编号 */

    start_index = (offset+(PAGE_CACHE_SIZE-1)) >> PAGE_CACHE_SHIFT;
    end_index = (endbyte >> PAGE_CACHE_SHIFT);

    if (end_index >= start_index)

    /* 将指定页从page cache中换出*/

    invalidate_mapping_pages(mapping, start_index,
    end_index);
    break;
    default:
    ret = -EINVAL;
    }
    out:
    fput(file);
    return ret;
    }


    展开全文
  • )文本终端应用程序(带有基本命令集)文件系统查看器架构amd64 (aka x86_64) - 引导到实现的限制armv7 - 加载用户空间然后崩溃构建依赖nasmGNU Binutils(交叉编译)GCC(用于 ACPICA)pxelinuxlibguestfs-tools ...
  • 这是的实现,该实现扩展以及bc的BSD风格的句点( . )扩展。 有关更多信息,请参见本bc的完整手册。 该bc还包括同一二进制文件中dc的实现,可通过符号链接访问它,该链接实现了所有FreeBSD和GNU扩展。 (如果独立dc...
  • 这个研究的最终目的是为了更好的实现一个在Windows环境下的posix libc库,而且能够直接在visual studio中使用。 二,编译方法 1,安装visula studio 2008,找到crt / src目录。 2,git clone ,从github上下载本...
  • file_cahe_with_threads C 中使用 POSIX 线程在用户空间中实现文件缓存的示例
  • unix-compat:便携式POSIX兼容性层。 该软件包提供了unix软件包各部分的可移植实现。 如果可用,此软件包将重新导出unix软件包。 当它不可用时,将使用可移植的实现
  • 软件包使用库实现了CLI配置读取器,该读取器具有结构嵌入式默认值,环境变量和posix兼容标志解析。 安装 通过运行安装: go get -u github.com/Luzifer/rconfig 或获取特定版本: go get -u gopkg.in/luzifer/...
  • Cbump 是 Open_Bump/Bump 的 POSIX C 实现! 与改进。 原来的凹凸! 是基于网络的实现,Open_Bump 是 Python,两者都不能在 Android 上运行。 因此,您可以直接在您的设备上碰撞任何内核或恢复。 完整的命令行界面...
  • 在helsing / configuration.h中,您可以轻松设置线程数,选择算法实现并进行调整,调整详细程度并启用从检查点恢复。 请务必阅读文档! Windows不兼容posix。 您需要设置虚拟机,或者(如果您使用的是Windows10)...
  • 基于Posix信号量实现的环形生产者消费者模型posix信号量操作接口定义初始化接口等待接口唤醒接口销毁接口如何保证同步&互斥基于Posix信号量实现的环形生产者消费者模型使用的数据结构实现操作代码实现环形队列类...

    posix信号量

    • 作用

    可以完成线程间与进程间的同步与互斥

    • 本质
    资源计数器 + PCB等待队列 + 提供等待和唤醒的接口
    

    在条件变量中,我们了解到条件变量是实现线程间同步功能的一种方式。而posix信号量和条件变量相比,多了一个资源计数器。
    资源计数器的作用就是用来对临界资源进行比较,posix信号量通过判断自身的资源计数器的情况,来得到当前资源是否可用的信息:

    • 可用:则对临界资源进行访问
    • 不可用:则进行阻塞等待,直到被唤醒接口唤醒。
      在这里插入图片描述

    操作接口

    定义

    sem_t 
    

    eg:sem_t sem;

    初始化接口

     #include <semaphore.h>
    int sem_init(sem_t *sem, int pshared, unsigned int value);
    
    • sem:传入信号量的地址
    • pshared:表示当前信号量表示的内容是线程间还是进程间
      0 --》 表示线程间
      1 --》 表示进程间

    当使用sem_init()函数初始化信号量为进程间的时候,会在共享内存中开辟一段共享内存,用来保存信号量的数据结构,其中包括资源计数器和PCB等待队列。所以调用唤醒或者等待接口的本质就是通过操作共享内存实现了不同进程之间的通信,进而实现不同进程间的同步与互斥。
    而在线程之间,一般就把这个信号量设置为全局变量或者类的私有成员变量,就可以通过操作这个变量来实现不同线程之间的同步和互斥。

    • value:实际的资源数量,用于初始化信号量当中的资源计数器,表示当前一共有value个资源

    等待接口

    #include <semaphore.h>
    //阻塞等待
    int sem_wait(sem_t *sem);
    //非阻塞等待
    int sem_trywait(sem_t *sem);
    //带有时间的等待
    int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
    
    • 如果判断当前信号量资源计数器中的值大于0,则能够成功获取信号量;如果资源计数器小于等于0,就说明没有资源,则阻塞该线程

    唤醒接口

    #include <semaphore.h>
    int sem_post(sem_t *sem);
    
    • 作用:用来发布信号量,告诉程序当前资源使用完成,需要归还资源或者让生产者重新生产一个资源,并对信号量中的资源计数器进行+1操作,然后唤醒PCB等待队列中的线程。

    销毁接口

    #include <semaphore.h>
    int sem_destroy(sem_t *sem);
    

    如何保证同步&互斥

    • 同步
      在初始化工作的时候,根据我们制定的资源的数量来初始化posix信号量中的资源计数器
      在这里插入图片描述
    • 互斥
      互斥的本质就是让资源计数器只有两个取值,一个是可以用,一个是不可以用。
      所以我们就可以初始化信号量中的资源计数器的值为1,这样就保证了1为资源可以用,0为资源不可以用。

    在这里插入图片描述

    基于Posix信号量实现的环形生产者消费者模型

    使用的数据结构

    一个用数组模拟的环形队列
    这个队列需要满足先进先出的特性

    实现操作

    • 前后指针,一个是生产者指针,一个是消费者指针。当两个指针指向同一位置的时候,表示表示当前队列为空;而当生产者指针的下一个位置 = 消费者指针的位置时,表示队列已满。

    在这里插入图片描述

    • 计算下一个位置的方式
    (当前位置 + 1) % 数组容量
    
    • 同步
      在生产者生产一个资源后,就通知消费者可以去消费了
    生产:
    sem_init(&_SemProducer,0,数组容量);//初始化
    sem_wait(&_SemProducer);//如果队列满了,资源没有消耗
    arr[] = ?;//生产资源
    sem_post(&_SemConsumer);//通知消费
    消费:
    sem_init(&_SemConsumer,0,0);//初始化
    sem_wait(&_SemConsumer);//如果队列为空,没有资源
    *Data = arr[];//消费资源
    sem_post(&_SemProducerr);//通知生产
    
    • 互斥
    sem_init(&_Lock,0,1);
    

    只设置一个临界资源,让临界资源同时只能被一个线程访问。

    代码实现

    环形队列类

    class LoopQueue
    {
    public:
      LoopQueue()
        : _arr(SIZE)
      {
        _Producer = 0;
        _Consumer = 0;
        _Capacity = SIZE;
        //同步生产者,信号量计数器的值和数组的容量一样大
        sem_init(&_SemProducter,0,_Capacity);
        //同步消费者,一开始并没有一个有效元素,所以初始化为0,
        sem_init(&_SemConsumer,0,0);
    
        //互斥,初始化资源数为1,只能有一个线程同时访问资源
        sem_init(&_Lock,0,1);
      }
    
      //操作生产者 
      void Push(int& Data)
      {
        sem_wait(&_SemProducter);//获取当前信号量到生产者这里,如果拿不到就阻塞等待 
    	//到这里说明,生产者需要生产资源了 
    	
        sem_wait(&_Lock);
        _arr[_Producer] = Data;
        _Producer = (_Producer + 1) % _Capacity;
        sem_post(&_Lock);
        
        sem_post(&_SemConsumer);//通知消费者消费 
      }
      
      //操作消费者 
      void Pop(int* Data)
      {
        sem_wait(&_SemConsumer);//获取当前信号量到消费者这里,如果拿不到就阻塞等待 
    	//到这里说明,消费者可以消费资源了
    	 
        sem_wait(&_Lock);
        *Data = _arr[_Consumer];
        _Consumer = (_Consumer + 1) % _Capacity;
        sem_post(&_Lock);
    
        sem_post(&_SemProducter);//消费完毕,通知生产者生产 
      }
    
      ~LoopQueue()
      {
        sem_destroy(&_SemConsumer);
        sem_destroy(&_SemProducter);
        sem_destroy(&_Lock);
      }
    
    private:
      vector<int> _arr;
      size_t _Capacity;
      // Double pointer
      int _Consumer;
      int _Producer;
    
      //同步
      sem_t _SemProducter;
      sem_t _SemConsumer;
    
      //互斥 
      sem_t _Lock;
    };
    

    消费者线程执行的逻辑

    void* Consumer_start(void* arg)
    {
      LoopQueue* que = (LoopQueue*) arg;
    
      int Data;
      while(1)
      {
        que->Pop(&Data);
        printf("_Consumer [%p]--> [%d]\n",pthread_self(),Data);
      }
      return NULL;
    }
    
    

    生产者线程执行的逻辑

    void* Producer_start(void* arg)
    {
      LoopQueue* que = (LoopQueue*) arg;
      int Data = 1;
      while(1)
      {
        que->Push(Data);
        printf("_Producer [%p] --> [%d]\n",pthread_self(),Data);
        Data++;
      }
      return NULL;
    }
    

    主函数

    int main()
    {
      LoopQueue* que = new LoopQueue();
      pthread_t com_tid[THREADCOUNT],pro_tid[THREADCOUNT];
    
      for(int i = 0; i < THREADCOUNT; i++)
      {
        int ret = pthread_create(&com_tid[i],NULL,Consumer_start,(void*)que);
        if(ret < 0)
        {
          perror("pthread_create consumer");
          return 0;
        }
    
        ret = pthread_create(&pro_tid[i],NULL,Producer_start,(void*)que);
        if(ret < 0)
        {
          perror("pthread_create Producter");
          return 0;
        }
      }
    
      for(int i = 0; i < THREADCOUNT; i++)
      {
        pthread_join(com_tid[i],NULL);
        pthread_join(pro_tid[i],NULL);
      }
    
      delete que;
      que = NULL;
    
      return 0;
    }
    

    运行结果

    在这里插入图片描述

    • 源码地址

    https://github.com/duchenlong/linux-text/blob/master/thread/semqueue.cpp

    展开全文

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 19,199
精华内容 7,679
关键字:

posix源码实现