2019-07-10 10:59:36 weixin_42462202 阅读数 1620
  • sql server 2008 数据库基础应用与开发教程

    本课程全面介绍了sqlserver2008系统的体系架构和功能。本课程以章节的形式,共13章,逐步学习,内容包括sqlserver2008基础知识、t-sql语言、数据库和表、select查询、视图、索引和游标、存储过程与触发器、数据库的备份与还原、安全与权限等。

    5895 人正在学习 去看看 徐新帅

IO多路复用接口Linux内核源码剖析,源码之前,了无秘密

Linux poll内核源码剖析

Linux select内核源码剖析

Linux epoll内核源码剖析

Linux select内核源码剖析


select的原理其实是和poll是一样的,都是采用轮询的方式。select相对于poll也许是比较节省空间吧,因为select是采用bitmap来标志的

本文先讲解一下如何在应用层使用select,然后再深入内核剖析select机制

select应用程序

select可以监听多个文件描述符,直到条件满足或者超时返回

  • select函数原型

    int select(int nfds, fd_set *readfds, fd_set *writefds,
               fd_set *exceptfds, struct timeval *timeout);
    

    nfds:最大的文件描述符加1

    readfds:监听可读集合

    writefds:监听可写集合

    exceptfds:监听异常集合

    timeout:超时时间

    对于这些集合,有一组函数可以进行设置

    void FD_SET(int fd, fd_set *set); //设置文件描述符到集合中
    void FD_CLR(int fd, fd_set *set); //从集合中清除文件描述符标志
    int  FD_ISSET(int fd, fd_set *set); //判断文件描述符在集合中是否被标志
    void FD_ZERO(fd_set *set); //清空集合
    

demo

下面这个程序使用select监听标准输入,直到标准输入可读时,返回并打印内容

#include <stdio.h>
#include <sys/select.h>

int main(int argc, char* argv[])
{
    fd_set rfds;
    int nfds;
    int i;
    char buf[1024];
    int len;

    FD_ZERO(&rfds); //清空集合

    FD_SET(0, &rfds); //标准输入
    nfds = 0 + 1; //最大文件描述符加1

    while(1)
    {
        fd_set fds = rfds;
        
        /* 开始监听 */
        if(select(nfds, &fds, NULL, NULL, NULL) < 0)
        {
            printf("select err.\n");
            return -1;
        }

        for(i = 0; i < nfds; i++)
        {
            /* 判断是否满足条件 */
            if(FD_ISSET(i, &fds))
            {
                len = read(i, buf, 1024);
                buf[len] = '\0';
                printf("read buf: %s\n", buf);
            }
        }
    }

    return 0;
}

select机制内核源码剖析

我们先来看看fd_set是什么东西

typedef __kernel_fd_set		fd_set;

typedef struct {
	unsigned long fds_bits [__FDSET_LONGS]; //定义一个数组
} __kernel_fd_set;

从上面可以看出,fd_set其实就是一个数组,内核用一个位来表示一个文件描述符,从内核定义来看,一共有1024个位

下面再来看看这四个设置函数

void FD_SET(int fd, fd_set *set); //设置文件描述符到集合中
void FD_CLR(int fd, fd_set *set); //从集合中清除文件描述符标志
int  FD_ISSET(int fd, fd_set *set); //判断文件描述符在集合中是否被标志
void FD_ZERO(fd_set *set); //清空集合
#define FD_SET(fd,fdsetp)	__FD_SET(fd,fdsetp)
#define FD_CLR(fd,fdsetp)	__FD_CLR(fd,fdsetp)
#define FD_ISSET(fd,fdsetp)	__FD_ISSET(fd,fdsetp)
#define FD_ZERO(fdsetp)		__FD_ZERO(fdsetp)

先看FD_SET,其实就是将特定的位置1

#define __FD_SET(fd, fdsetp) \
		(((fd_set *)(fdsetp))->fds_bits[(fd) >> 5] |= (1<<((fd) & 31)))

再看看FD_CLR,其实就是将特定的位置0

#define __FD_CLR(fd, fdsetp) \
		(((fd_set *)(fdsetp))->fds_bits[(fd) >> 5] &= ~(1<<((fd) & 31)))

看看FD_ISSET,其实就是判断特定的位是否被置1

#define __FD_ISSET(fd, fdsetp) \
		((((fd_set *)(fdsetp))->fds_bits[(fd) >> 5] & (1<<((fd) & 31))) != 0)

看一下FD_ZERO,其实就是将所有的位置0

#define __FD_ZERO(fdsetp) \
		(memset (fdsetp, 0, sizeof (*(fd_set *)(fdsetp))))

至此我们直到,fd_set其实就是一个数组,然后里面每一个位都表示一个文件描述符的状态,我们将我们要监听的文件描述符对应的位标志好后,传递给内核,内核会将状态通过位标记返回到应用层

下面就马上来分析select对应的系统调用

select对应的系统调用如下

SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp,
		fd_set __user *, exp, struct timeval __user *, tvp)

将其展开后得到如下函数

long sys_select(int n, fd_set __user * inp, fd_set __user * outp,
                    fd_set __user * exp, struct timeval __user * tvp)
SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp,
		fd_set __user *, exp, struct timeval __user *, tvp)
{
    /* 从应用层会传递过来三个需要监听的集合,可读,可写,异常 */
    ret = core_sys_select(n, inp, outp, exp, to);
    
    return ret;
}

接下来看core_sys_select

int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp,
			   fd_set __user *exp, struct timespec *end_time)
{
    /* 在栈上分配一段内存 */
    long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];
    
    size = FDS_BYTES(n); //n个文件描述符需要多少个字节
    
    /* 
     * 如果栈上的内存太小,那么就重新分配内存
     * 为什么是除以6呢?
     * 因为每个文件描述符要占6个bit(输入:可读,可写,异常;输出结果:可读,可写,异常)
     */
    if (size > sizeof(stack_fds) / 6)
		bits = kmalloc(6 * size, GFP_KERNEL);
    
    /* 设置好bitmap对应的内存空间 */
    fds.in      = bits; //可读
	fds.out     = bits +   size; //可写
	fds.ex      = bits + 2*size; //异常
	fds.res_in  = bits + 3*size; //返回结果,可读
	fds.res_out = bits + 4*size; //返回结果,可写
	fds.res_ex  = bits + 5*size; //返回结果,异常
    
    /* 将应用层的监听集合拷贝到内核空间 */
    get_fd_set(n, inp, fds.in);
    get_fd_set(n, outp, fds.out);
    get_fd_set(n, exp, fds.ex);
    
    /* 清空三个输出结果的集合 */
	zero_fd_set(n, fds.res_in);
	zero_fd_set(n, fds.res_out);
	zero_fd_set(n, fds.res_ex);
    
    /* 调用do_select阻塞,满足条件时返回 */
    ret = do_select(n, &fds, end_time);
    
    /* 将结果拷贝回应用层 */
    set_fd_set(n, inp, fds.res_in);
    set_fd_set(n, outp, fds.res_out);
    set_fd_set(n, exp, fds.res_ex);
    
    return ret;
}

下面来看一看do_select函数

int do_select(int n, fd_set_bits *fds, struct timespec *end_time)
{
    for (;;) {
        /* 遍历所有监听的文件描述符 */
    	for (i = 0; i < n; ++rinp, ++routp, ++rexp)
        {
            for (j = 0; j < __NFDBITS; ++j, ++i, bit <<= 1)
            {
                /* 调用每一个文件描述符对应驱动的poll函数,得到一个掩码 */
                mask = (*f_op->poll)(file, wait);
                
                /* 根据掩码设置相应的bit */
                if ((mask & POLLIN_SET) && (in & bit)) {
                    res_in |= bit;
                    retval++;
                }
                
                if ((mask & POLLOUT_SET) && (out & bit)) {
                    res_out |= bit;
                    retval++;
                }
                
                if ((mask & POLLEX_SET) && (ex & bit)) {
                    res_ex |= bit;
                    retval++;
                }
            }
        }
    
        /* 如果条件满足,则退出 */
        if (retval || timed_out || signal_pending(current))
            break;
        
        /* 调度,进程睡眠 */
        poll_schedule_timeout(&table, TASK_INTERRUPTIBLE, to, slack);
    }
}

do_select会遍历所有要监听的文件描述符,调用对应驱动程序的poll函数,驱动程序的poll一般实现如下

static unsigned int button_poll(struct file *fp, poll_table * wait)
{
	unsigned int mask = 0;

    /* 调用poll_wait */
	poll_wait(fp, &wq, wait); //wq为自己定义的一个等待队列头

	/* 如果条件满足,返回相应的掩码 */
	if(condition)
		mask |= POLLIN; 

	return mask;
}

看看poll_wait做了什么

static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
	if (p && wait_address)
		p->qproc(filp, wait_address, p);
}

p->qproc在之前又被初始化为__pollwait

static void __pollwait(struct file *filp, wait_queue_head_t *wait_address,
				poll_table *p)
{
    /* 分配一个结构体 */
    struct poll_table_entry *entry = poll_get_entry(pwq);
    
    /* 将等待队列元素加入驱动程序的等待队列头中 */
	add_wait_queue(wait_address, &entry->wait); 
}

由此可知,do_select中对每一个文件描述符调用(*f_op->poll)(file, wait),是为每一个文件描述符申请一个等待队列元素,然后将其添加到对应驱动程序的等待队列中,等待条件满足时唤醒

我们再回到do_select函数里,第一遍遍历调用``(*f_op->poll)(file, wait)`是为了为每一个文件描述符申请一个等待队列元素,将其添加到对应驱动程序的等待队列中,然后会睡眠等待,当有条件满足时,对应的驱动会通过等待队列唤醒该进程,然后进行第二次遍历,此时得到一个掩码,然后设置好每一个文件描述符状态,退出

至此,select也就分析完了

总结

select和poll的实现原理是一样的,只是select采用bitmap的方式来标记文件描述符,然后select 最不能忍受的是一个进程所打开的FD是有一定限制的,由FD_SETSIZE设置,默认值是1024

select会有两次遍历所有监听的文件描述符,第一次是将等待队列元素添加到对应驱动程序的等待队列中,然后调度,睡眠等待唤醒,第二次是当条件满足时,驱动程序会唤醒等待队列,然后select会进行第二次遍历,获取一个掩码,设置好每一个文件描述符的bitmap,然后再将结果拷贝回用户空间

2018-05-21 20:58:52 Function_Dou 阅读数 1006
  • sql server 2008 数据库基础应用与开发教程

    本课程全面介绍了sqlserver2008系统的体系架构和功能。本课程以章节的形式,共13章,逐步学习,内容包括sqlserver2008基础知识、t-sql语言、数据库和表、select查询、视图、索引和游标、存储过程与触发器、数据库的备份与还原、安全与权限等。

    5895 人正在学习 去看看 徐新帅

select源码分析


select()

函数是从SYSCALL_DEFINE5(select, ...)开始. 可以简单的将SYSCALL_DEFINEx理解为系统定义的系统函数, 如果想了解 SYSCALL_DEFINE 可以看一下.

具体的执行流程是 :

  • 将时间定义从用户空间复制到内核空间中, 进行时间片的设置, 如果为0或不合法就设置为默认值, 否则就设置为传入的时间.
  • 调用core_sys_select函数, 以实现等待消息到来, 轮询等主要操作
  • 调用timeval_compare返回执行完剩余的时间
  • 最后将返回的时间使用copy_to_user复制到用户空间
SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp, fd_set __user *, exp, struct timeval __user *, tvp)
{
    s64 timeout = -1;
    struct timeval tv;
    int ret;

    if (tvp)
    {
        // 将数据从用户空间拷贝到内核空间的tv中
        if (copy_from_user(&tv, tvp, sizeof(tv)))
            return -EFAULT;
        ...
    }

    // 设置 fds 结构的参数并且等待消息的到来, 或者时间片没有结束调度程序
    ret = core_sys_select(n, inp, outp, exp, &timeout);

    // 设置时间片
    if (tvp)
    {
        ...
        // 返回执行完后剩余时间
        if (timeval_compare(&rtv, &tv) >= 0)
            rtv = tv;
        if (copy_to_user(tvp, &rtv, sizeof(rtv))) 
        ...
    }
    return ret;
}

core_sys_select

因为select的主要功能都是do_select函数, 这里我们先分析一下关于core_sys_select函数.

  • 获取文件文件描述符表并存放在fdtable
  • fdtable读, 写, 错误分配空间, 并初始化
  • 将select传入的readfds,writefds, errorfds参数从用户空间复制到内核空间的fdtable对应的读, 写, 错误中. 这里需要解释一下, fdselect主要是保存之后要将来的信号返回给用户空间的.
  • 调用do_select, 轮询等待消息的到来, 并且将消息的文件描述符保存在fds结构体中
  • fds保存的读, 写, 错误集合从内核空间复制到用户空间
// 参数满足 int select(int maxfdp,fd_set *readfds,fd_set *writefds,fd_set *errorfds,struct timeval*timeout); 
// typedef __kernel_fd_set fd_set
int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp,fd_set __user *exp, s64 *timeout)
{
    fd_set_bits fds;
    void *bits;
    int ret, max_fds;
    unsigned int size;
    struct fdtable *fdt;
    /*
    #define FRONTEND_STACK_ALLOC    256
    #define SELECT_STACK_ALLOC  FRONTEND_STACK_ALLOC
    */ 
    // 计算出来有32位, 所以数组的大小也定义的是32
    long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];
    // 文件描述符表
    fdt = files_fdtable(current->files);
    ...
    fds.in      = bits;
    fds.out     = bits +   size;
    fds.ex      = bits + 2*size;
    fds.res_in  = bits + 3*size;
    fds.res_out = bits + 4*size;
    fds.res_ex  = bits + 5*size;
    // 实现将数据用户空间复制到内核空间中, 这是是把数据从inp复制到内核的fds空间中
    if ((ret = get_fd_set(n, inp, fds.in)) ||
        (ret = get_fd_set(n, outp, fds.out)) ||
        (ret = get_fd_set(n, exp, fds.ex)))
        goto out;
    // res初始化为0
    zero_fd_set(n, fds.res_in);
    zero_fd_set(n, fds.res_out);
    zero_fd_set(n, fds.res_ex);
    // 调用do_select函数, 检验, 等待消息到来, 并且do_select函数会把到来消息的文件描述符存放在fds结构体中
    ret = do_select(n, &fds, timeout);
    ...
    // 实现将数据从内核空间复制到内核空间中
    if (set_fd_set(n, inp, fds.res_in) ||
        set_fd_set(n, outp, fds.res_out) ||
        set_fd_set(n, exp, fds.res_ex))
        ret = -EFAULT;
    ...
    return ret;
}

在这里可以看出来core_sys_select函数也是只是分配空间, 复制到内核中, 我们还没有看到阻塞等待消息的代码, 所以重要的还是在do_select函数中. 可以看到在最后core_sys_select调用了do_select函数, 等待消息的到来.

关于stack_fds这个数组, 要存放的是6个位图, 分别对应用户态传入的存放监听读、写、异常三个操作的文件描述符集合,以及这三个操作在select执行过后需要返回的三个集合。这是 select 的机制,每次执行 select() 之后,函数把“就绪”的文件描述符留下,返回。下一次,再次执行 select() 时,需要重新把需要监听的文件描述符传入。

do_select

不过分析函数前, 先看看三个待会会用到的宏定义. POLLIN_SET是检查输入消息, 同理POLLOUT_SET检查输出, POLLEX_SET检查错误.

#define POLLIN_SET (POLLRDNORM | POLLRDBAND | POLLIN | POLLHUP | POLLERR)
#define POLLOUT_SET (POLLWRBAND | POLLWRNORM | POLLOUT | POLLERR)
#define POLLEX_SET (POLLPRI)

好了, 现在可以开始分析do_select源码.

  • 获得系统能支持的最大文件描述符
  • 调用poll_initwait函数设置回调消息到来时的回调函数
  • 轮询, 不断的检查链表中有没有消息到来
  • 没有消息就直接启动调度程序, 等待一定的时间片返回进程重复判断消息到来
  • 有消息到来, 或者设置的时间到达后, 退出轮询, 判断消息的类型(输入, 输出, 还是错误), 并保存其消息的文件描述符
  • 将进程设置为运行态, 清除集合, 返回
int do_select(int n, fd_set_bits *fds, s64 *timeout)
{
    struct poll_wqueues table;
    poll_table *wait;
    int retval, i;
    ...
    // 获取最大文件描述符
    retval = max_select_fd(n, fds);
    ...
    n = retval;
    // 初始化等待队列, 并且设置回调函数, 有就绪文件描述符是就执行回调
    poll_initwait(&table);
    wait = &table.pt;
    if (!*timeout)
        wait = NULL;
    retval = 0;
    for (;;)
    {
        unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
        long __timeout;
        // 进程设置为可中断的
        set_current_state(TASK_INTERRUPTIBLE);
        // select的三个参数, 读, 写, 错误. 以及三个返回参数.
        inp = fds->in; outp = fds->out; exp = fds->ex;
        rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;
        // 遍历所有的设置的文件描述符
        for (i = 0; i < n; ++rinp, ++routp, ++rexp) 
        {
            unsigned long in, out, ex, all_bits, bit = 1, mask, j;
            unsigned long res_in = 0, res_out = 0, res_ex = 0;
            const struct file_operations *f_op = NULL;
            struct file *file = NULL;
            in = *inp++; out = *outp++; ex = *exp++;
            // 确定该位有设置好等待返回的进程描述符, 如果没有参数继续自增, 进程等待, 有设置好的描述符, 就执行下一个for循环
            all_bits = in | out | ex;
            if (all_bits == 0)
            {
                i += __NFDBITS;
                continue;
            }

            // 每次遍历一位, 也就是遍历一个文件描述符
            for (j = 0; j < __NFDBITS; ++j, ++i, bit <<= 1) 
            {
                int fput_needed;
                // 是否超过最大设置的文件描述符
                if (i >= n)
                    break;
                // 判断是哪一位的有消息到来
                if (!(bit & all_bits))
                    continuea;
                // 从文件描述符中获取文件结构体
                file = fget_light(i, &fput_needed);
                if (file) 
                {
                    f_op = file->f_op;
                    mask = DEFAULT_POLLMASK;
                    if (f_op && f_op->poll)
                        // 调用文件所对应的具体方法, 具体操作, 比如是POLLIN操作, 检测了文件是否就绪,而且还把当前进程加入等待队列,如果该文件描述符就绪,则会触发回调,以及唤醒该进程
                        mask = (*f_op->poll)(file, retval ? NULL : wait);
                    fput_light(file, fput_needed);
                    if ((mask & POLLIN_SET) && (in & bit)) // 与掩码mask相与, 判断是否是输入操作, 同时检测是否该文件描述符设置了输入操作
                    {
                        res_in |= bit;  // 返回的res_op设置为POLLIN
                        retval++;
                    }
                    if ((mask & POLLOUT_SET) && (out & bit)) 
                    {
                        res_out |= bit;
                        retval++;
                    }
                    if ((mask & POLLEX_SET) && (ex & bit)) 
                    {
                        res_ex |= bit;
                        retval++;
                    }
                }
            }
            if (res_in)
                *rinp = res_in;
            if (res_out)
                *routp = res_out;
            if (res_ex)
                *rexp = res_ex;
            cond_resched();
        }
        wait = NULL;
        // 如果时间到了或者有事件到来
        if (retval || !*timeout || signal_pending(current))
            break;
        ...

        // 没有消息到来, 并且时间没有结束, 那么就进行调度程序, 等待时间片结束后返回到进程, 重新判断消息是否到来, 重复操作
        __timeout = schedule_timeout(__timeout);
        if (*timeout >= 0)
            *timeout += __timeout;
    }
    // 将进程设置为运行态
    __set_current_state(TASK_RUNNING);
    // 释放页中的所有数据
    // 所以我们在不断轮询的时侯, 每次都会重新为 fds 赋值, 因为每次操作完的时侯都会将传入的 fds 修改, 清除, 所以我们需要重复为 fds 恢复
    poll_freewait(&table);

    return retval;
}

这里也就可以看出select的效率, 时间复杂度居然是O(n3), 随着等待的文件描述符越来越多, 那么等待的时间也就越长, 而且等待的数据也是很有限, 对于一个服务器来说这是在太少了. 剩下的操作已经在注释中写的很清楚了. 希望对读者有用.


总结

我总结了一下函数的调用历程, 这样调理也就更加的清楚了.

这里写图片描述
select()函数首先是将参数时间调用copy_from_user将其从用户空间复制到内核空间中, 然后对时间片进行设置(如果时间<=0 或非法将设置为默认一直等待), 然后调用core_sys_select函数, 分配一个fds的数据结构来保存关于传入参数in, out, ex集合的数据. 接着就调用了函数do_select, 先是在设置的时间段进行等待, 遍历所有传入的文件描述符, 如果有消息到来就直接返回给用户空间, 没有的消息, 就先进行进程调度, 同时设置一个回调时间, 在时间片结束后又回调到该进程, 继续判断消息的到来, 还是没有消息就重复调度, 直到有消息到来. 消息到来后, 先是遍历所有的消息, 确定是集合中的消息. 是文件描述符集合中的消息, 就保存该文件描述符, 退出轮询, 并且将文件集合清除, 只保留一个描述符, 然后将描述符返回到core_sys_select, 然后该函数将返回的描述符从内核空间复制到用户空间, 通过FD_ISSET来进行确认. 最后select将剩余的时间返回.

2019-07-11 09:41:31 weixin_42462202 阅读数 2556
  • sql server 2008 数据库基础应用与开发教程

    本课程全面介绍了sqlserver2008系统的体系架构和功能。本课程以章节的形式,共13章,逐步学习,内容包括sqlserver2008基础知识、t-sql语言、数据库和表、select查询、视图、索引和游标、存储过程与触发器、数据库的备份与还原、安全与权限等。

    5895 人正在学习 去看看 徐新帅

IO多路复用接口Linux内核源码剖析,源码之前,了无秘密

Linux poll内核源码剖析

Linux select内核源码剖析

Linux epoll内核源码剖析

Linux epoll内核源码剖析


前面介绍了select/poll,此文章将讲解epoll,epoll是select/poll的增强版,epoll更加高效,当然也更加复杂

epoll高效的一个重要的原因是在获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就可以

本文将先讲解如何使用epoll,然后再深入Linux内核源码分析epoll机制

epoll应用程序编写

epoll机制提供了三个系统调用epoll_createepoll_ctlepoll_wait

下面是这三个函数的原型

  • epoll_create

    int epoll_create(int size);
    

    此函数返回一个epoll的文件描述符

  • epoll_ctl

    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
    

    此函数是添加、删除、修改事件

    epfd:epoll的文件描述符

    op:表示选项,EPOLL_CTL_ADD(添加事件)、EPOLL_CTL_MOD(修改事件)、EPOLL_CTL_DEL(删除事件)

    fd:要操作的文件描述符

    event:事件信息

  • epoll_wait

    int epoll_wait(int epfd, struct epoll_event *events,
                   int maxevents, int timeout);
    

    此函数是等待条件满足,放回值为准备就绪的事件数

    epfd:epoll的文件描述符

    events:返回的事件信息

    maxevents:要等待的最大事件数

    timeout:超时时间

    • epoll_event

      struct epoll_event
      {
        uint32_t events;		//epoll事件
        epoll_data_t data; 	//联合体,一般表示文件描述符
      };
      

demo

下面这段代码,监听标准输入,当标准输入可读时,就打印读取到的信息

#include <stdio.h>
#include <stdlib.h>
#include <sys/epoll.h>

#define MAX_EVENTS 10

int main(int argc, char* argv[])
{
    struct epoll_event ev, events[MAX_EVENTS];
    int nfds, epollfd, len;
    char buf[1024];
    int n;

    epollfd = epoll_create(10);
    if (epollfd == -1)
    {
        perror("epoll_create");
        exit(-1);
    }

    ev.events = EPOLLIN;
    ev.data.fd = 0;
    if (epoll_ctl(epollfd, EPOLL_CTL_ADD, 0, &ev) == -1)
    {
        perror("epoll_ctl: listen_sock");
        exit(-1);
    }

    while(1)
    {
        nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
        if (nfds == -1) 
        {
            perror("epoll_pwait");
            exit(-1);
        }

        for (n = 0; n < nfds; ++n)
        {
            if(events[n].events & EPOLLIN)
            {
                len = read(events[n].data.fd, buf, 1024);
                buf[len] = '\0';
                printf("fd:%d; read buf:%s\n", events[n].data.fd, buf);
            }
        }
    }

    return 0;
}

epoll机制内核源码剖析

由上面的应用程序可知,epoll机制有三个系统调用,分别为epoll_createepoll_ctlepoll_wait,下面将详细地来分析这三个系统调用

epoll_create

epoll_create对应地系统调用如下

SYSCALL_DEFINE1(epoll_create, int, size)

展开后变成

long sys_epoll_create(int size)

接下来看看epoll_create做了什么事情

SYSCALL_DEFINE1(epoll_create, int, size)
{
    if (size <= 0)
        return -EINVAL;
    
    return sys_epoll_create1(0);
}

由上面的程序可以看出,指定size并没有什么作用,只是判断是否小于等于0而已

下面看一看sys_epoll_create1

SYSCALL_DEFINE1(epoll_create1, int, flags)
{
    struct eventpoll *ep = NULL;
    
    ep_alloc(&ep); //分配内存
    
    /* 定义一个epoll的文件描述符 */
    error = anon_inode_getfd("[eventpoll]", &eventpoll_fops, ep,
                     O_RDWR | (flags & O_CLOEXEC));
    
    /* 返回文件描述符 */
    return error;
}

eventpoll起到管理epoll事件的作用,这个结果贯穿整个epoll机制,下面来看看这个结构体

struct eventpoll {
    /* 等待队列头,被sys_epoll_wait使用 */
	wait_queue_head_t wq;
	
    /* 保准准备就绪的文件描述符的一个链表 */
    struct list_head rdllist;
    
    /* 红黑树节点,epoll使用红黑树存储事件信息 */
    struct rb_root rbr;
    
   	...
};

epoll_create的作用是申请一个eventpoll结构体,申请一个epoll文件描述符,然后放回到用户空间

epoll_ctl

epoll_ctl用于添加,删除,修改事件

对应的系统调用如下

SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
		struct epoll_event __user *, event)

展开后得到

long sys_epoll_ctl(int epfd, int op, int fd, struct epoll_event __user * event)

下面来详细分析

SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
		struct epoll_event __user *, event)
{
    struct eventpoll *ep;
    struct epitem *epi;
    struct epoll_event epds;
    
    copy_from_user(&epds, event, sizeof(struct epoll_event)); //拷贝事件
    
    switch (op) {
    	case EPOLL_CTL_ADD: //添加事件
            ep_insert(ep, &epds, tfile, fd);
            break;
         case EPOLL_CTL_DEL: //删除事件
            ep_remove(ep, epi);
            break;
    	case EPOLL_CTL_MOD: //修改事件
            ep_modify(ep, epi, &epds);
            break;
    }
    
}

在这里主要分析ep_insert添加事件,在分析之前,先将一下epitem,epoll在内核实现的时候,事件是以epitem为单位,保存到红黑树的,下面看一看epitem结构体

struct epitem {
    struct rb_node rbn; //红黑树
    struct list_head rdllink; //就绪链表
    struct eventpoll *ep; //用于指向eventpoll
    struct epoll_event event; //event事件
};

好,接下来分析ep_insert

static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
		     struct file *tfile, int fd)
{
    struct epitem *epi;
    struct ep_pqueue epq; //ep_pqueue结构体见下方
    
    epi = kmem_cache_alloc(epi_cache, GFP_KERNEL); //分配一个epoll项
    epi->ep = ep; //指向所属的eventpoll
    epi->event = *event; //赋值事件
    
    /* 初始化poll_table;pt->qproc = ep_ptable_queue_proc; */
    init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
    
    /* 调用驱动程序的poll函数,具体下面分析 */
    tfile->f_op->poll(tfile, &epq.pt);
    
    /* 添加到红黑树中 */
    ep_rbtree_insert(ep, epi);
}
  • ep_pqueue

    struct ep_pqueue {
    	poll_table pt; //用于调用文件描述符的驱动程序的poll使用
    	struct epitem *epi; //epoll项
    };
    

下面分析上面的tfile->f_op->poll(tfile, &epq.pt)

一般驱动程序的poll实现如下

static unsigned int poll(struct file *fp, poll_table * wait)
{
	unsigned int mask = 0;

    /* 调用poll_wait */
	poll_wait(fp, &wq, wait); //wq为自己定义的一个等待队列头

	/* 如果条件满足,返回相应的掩码 */
	if(condition)
		mask |= POLLIN; 

	return mask;
}

看一看poll_wait什么内容

static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
	if (p && wait_address)
		p->qproc(filp, wait_address, p);
}

调用了p->qproc(filp, wait_address, p)函数,还记得上面程序将其初始化init_poll_funcptr(&epq.pt, ep_ptable_queue_proc)

static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
				 poll_table *pt)
{
    struct epitem *epi = ep_item_from_epqueue(pt); //找到对应的epoll项
    struct eppoll_entry *pwq; //具体定义看下面
    
    pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL)//分配内存
        
    /* 初始化等待队列的唤醒函数,当驱动程序唤醒等待队列时,会调用此函数(ep_poll_callback) */
    init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
    pwq->base = epi; //设置好epoll项
    add_wait_queue(whead, &pwq->wait); //将等待队列元素添加到驱动的等待队列中
}
  • eppoll_entry

    struct eppoll_entry {
    	struct epitem *base; //指向epoll项
        wait_queue_t wait; //等待队列元素
        wait_queue_head_t *whead; //等待队列头
    };
    

回到ep_insert函数,我们可知道tfile->f_op->poll(tfile, &epq.pt)做了什么事情

  • 1、添加等待队列元素到驱动的等待队列中
  • 2、初始化驱动唤醒等待队列时调用的函数,init_waitqueue_func_entry(&pwq->wait, ep_poll_callback)

epoll_wait

epoll_wait对应的系统调用如下

SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
		int, maxevents, int, timeout)

展开后变成

long sys_epoll_wait(int epfd, struct epoll_event __user * events, int maxevents, int timeout)

下面来详细分析

SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
		int, maxevents, int, timeout)
{
	ep_poll(ep, events, maxevents, timeout);
}
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
		   int maxevents, long timeout)
{
	init_waitqueue_entry(&wait, current); //初始化等待队列元素
    __add_wait_queue_exclusive(&ep->wq, &wait); //将等待队列元素添加到event_poll的等待队列中
    
    for (;;) {
    	set_current_state(TASK_INTERRUPTIBLE); //设置当前进程状态
        
        /* 如果就绪链表中有元素或者超时则退出 */
        if (!list_empty(&ep->rdllist) || timed_out)
            break;
    
        /* 任务调度,睡眠 */
    	schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS);
    }
    
    /* 设置进程状态 */
    set_current_state(TASK_RUNNING);

    /* 返回信息到应用层 */
	ep_send_events(ep, events, maxevents));

    /* 返回就绪的事件数 */
	return res;
}

当ep_poll调用schedule_hrtimeout_range的时候,会睡眠等待,直到驱动程序调用wake_up唤醒等待队列时,会再次醒过来,而wake_up的实现如下

...
wait_queue_t *curr;
curr->func(curr, mode, wake_flags, key)
...

会调用等待队列中的等待元素的func函数,epoll添加到驱动程序的等待队列的等待元素的func已经被初始化init_waitqueue_func_entry(&pwq->wait, ep_poll_callback)

下面来分析ep_poll_callback函数,来看看驱动程序是怎么唤醒进程的

static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
    struct epitem *epi = ep_item_from_wait(wait); //得到对应的epoll项
    struct eventpoll *ep = epi->ep; //得到eventpoll
    
    list_add_tail(&epi->rdllink, &ep->rdllist); //将该epoll项添加到eventpoll的就绪链表中
    
    wake_up_locked(&ep->wq); //唤醒eventpoll睡眠的进程
}

唤醒之后,继续回到ep_poll函数运行,此时会判断if (!list_empty(&ep->rdllist) || timed_out)就绪链表不为空,则退出循环,然后调用ep_send_events(ep, events, maxevents)来获取就绪链表中的epoll项的状态,接下来分析ep_send_events(ep, events, maxevents))

static int ep_send_events(struct eventpoll *ep,
			  struct epoll_event __user *events, int maxevents)
{
	struct ep_send_events_data esed;

	esed.maxevents = maxevents;
	esed.events = events;

	return ep_scan_ready_list(ep, ep_send_events_proc, &esed);
}
static int ep_scan_ready_list(struct eventpoll *ep,
			      int (*sproc)(struct eventpoll *,
					   struct list_head *, void *),
			      void *priv)
{
    LIST_HEAD(txlist); //定义一个链表头
    
    list_splice_init(&ep->rdllist, &txlist); //将就绪链表中的元素交换到txlist中
    
    error = (*sproc)(ep, &txlist, priv); //调用回调函数,此回调函数被初始化为ep_send_events_proc
    
    return error; //返回就绪的事件数
}

下面来分析ep_send_events_proc函数

static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
			       void *priv)
{
    struct ep_send_events_data *esed = priv; //包含事件和事件数
    
    unsigned int revents;
    struct epitem *epi;
    struct epoll_event __user *uevent;

    /* 遍历就绪链表 */
    for(...)
    {
        epi = list_first_entry(head, struct epitem, rdllink);
		list_del_init(&epi->rdllink);
        
        /* 调用驱动程序的poll获取状态 */
        revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL) & epi->event.events;
    
        /* 将事件状态拷贝到应用层 */
    	__put_user(revents, &uevent->events);
        __put_user(epi->event.data, &uevent->data);
        
        eventcnt++;
        uevent++;
    }
    
    return eventcnt; //返回事件数
}

到这里可以知道ep_poll函数做了什么事情

  • 1、定义一个等待队列元素,添加到eventpoll的等待队列中,等待唤醒
  • 2、当IO准备就绪时,驱动程序会调用回调函数,将就绪的epoll项添加到就绪链表中,并唤醒eventpoll的等待队列
  • 3、继续运行ep_poll函数,发现就绪链表中有元素,则跳出循环
  • 4、调用ep_send_events函数,调用就绪的epoll项对应驱动程序的poll函数,得到状态,然后再返回到应用层

至此,epoll也就分析完了

总结

epoll是select/poll的增强版,select/epoll是采用轮询的方式,而epoll是通过回调,然后将就绪的IO添加到就绪链表,然后只查询这些就绪的IO状态,从而大大减少不必要的操作,所以在IO数量较多时,epoll的性能优于select/poll

2018-08-16 20:16:40 qq_38623623 阅读数 209
  • sql server 2008 数据库基础应用与开发教程

    本课程全面介绍了sqlserver2008系统的体系架构和功能。本课程以章节的形式,共13章,逐步学习,内容包括sqlserver2008基础知识、t-sql语言、数据库和表、select查询、视图、索引和游标、存储过程与触发器、数据库的备份与还原、安全与权限等。

    5895 人正在学习 去看看 徐新帅

select是在一定时间内,监听用户注册的可读、可写、异常事件。它的可读、可写、异常事件分别对应着文件描述符的集合。当有事件发生的时候,内核就会修改这些参数来告诉应用程序哪些文件描述符以及就绪了。这样的话下次调用select时就需要重新设置可读、可写、异常事件的文件描述符。
select系统调用的顺序是:
select () -> sys_select() -> do_select ()
所以,select是系统调用,它进入内核态就调用sys_select()

typedef struct 
{
    unsigned long *in, *out, *ex;
    unsigned long *res_in, *res_out, *res_ex;
}fd_set_bits;
/*
in out ex分别保存用户注册的感兴趣的事件,res_in,res_out,res_ex分别保存这个文件描述符上的用户感兴趣的事件,
返回的时候把res_in res_out res_ex的值赋给in,out,ex,这就是从用户空间拷贝到内核空间,然后再从内核空间拷贝到用户空间。
因为select这样每次调用的时候需要来回拷贝,所以造成效率问题。
当fd_set_bits这个结构体整合好之后当做参数,作为参数传递给do_select()
*/
//sys_select是处理时间函数
asmlinkage long sys_select(int n, fd_set __user *inp, fd_set __user *outp, fd_set __user *exp, struct timeval __user *tvp)
{
    fd_set_bits fds;//这个结构体保存用户传进来的参数
    char *bits;
    long timeout;
    int ret, size, max_fdset;

    //从用户进程拷贝超时时间,将超时时间换成时钟周期数
    timeout = MAX_SCHEDULE_TIMEOUT;//检查事件会不会永远等待下去
    if (tvp) {//对timeval超时时间参数处理
        time_t sec, usec;

        if ((ret = verify_area(VERIFY_READ, tvp, sizeof(*tvp)))
            || (ret = __get_user(sec, &tvp->tv_sec))
            || (ret = __get_user(usec, &tvp->tv_usec)))
            goto out_nofds;

        ret = -EINVAL;
        if (sec < 0 || usec < 0)
            goto out_nofds;
    //进行单位换算
        if ((unsigned long) sec < MAX_SELECT_SECONDS) {
            timeout = ROUND_UP(usec, 1000000/HZ);
            timeout += sec * (unsigned long) HZ;
        }
    }

    ret = -EINVAL;
    if (n < 0)
        goto out_nofds;

    /* max_fdset can increase, so grab it once to avoid race */
    max_fdset = current->files->max_fdset;
    if (n > max_fdset)
        n = max_fdset;

    /*
     * We need 6 bitmaps (in/out/ex for both incoming and outgoing),
     * since we used fdset we need to allocate memory in units of
     * long-words. 
     */
    ret = -ENOMEM;
    size = FDS_BYTES(n);
    //为内核的fd_set_bits结构体申请空间并初始化
    bits = select_bits_alloc(size);
    if (!bits)
        goto out_nofds;
    //设置fds
    fds.in      = (unsigned long *)  bits;
    fds.out     = (unsigned long *) (bits +   size);
    fds.ex      = (unsigned long *) (bits + 2*size);
    fds.res_in  = (unsigned long *) (bits + 3*size);
    fds.res_out = (unsigned long *) (bits + 4*size);
    fds.res_ex  = (unsigned long *) (bits + 5*size);
    //把用户感兴趣的事件从用户态拷贝到内核态
    if ((ret = get_fd_set(n, inp, fds.in)) ||
        (ret = get_fd_set(n, outp, fds.out)) ||
        (ret = get_fd_set(n, exp, fds.ex)))
        goto out;
    //清空存放返回事件的位数组
    zero_fd_set(n, fds.res_in);
    zero_fd_set(n, fds.res_out);
    zero_fd_set(n, fds.res_ex);
    //执行do_select(),监听用户感兴趣的事件
    ret = do_select(n, &fds, &timeout);//do_select时select中很关键的一个函数

    if (tvp && !(current->personality & STICKY_TIMEOUTS)) {
        time_t sec = 0, usec = 0;
        if (timeout) {
            sec = timeout / HZ;
            usec = timeout % HZ;
            usec *= (1000000/HZ);
        }
        put_user(sec, &tvp->tv_sec);
        put_user(usec, &tvp->tv_usec);
    }

    if (ret < 0)
        goto out;
    if (!ret) {//超时返回,无设备就绪
        ret = -ERESTARTNOHAND;
        if (signal_pending(current))
            goto out;
        ret = 0;
    }
    //将存放事件结果的位数组拷贝给用户参数位数组,通过修改用户传进来的位数组将事件返回给用户
    if (set_fd_set(n, inp, fds.res_in) ||
        set_fd_set(n, outp, fds.res_out) ||
        set_fd_set(n, exp, fds.res_ex))
        ret = -EFAULT;

out:
    select_bits_free(bits, size);
out_nofds:
    return ret;
}

接下来是执行do_select()的操作

int do_select(int n, fd_set_bits *fds, long *timeout)
{
    struct poll_wqueues table;
    poll_table *wait;
    int retval, i;
    long __timeout = *timeout;

    spin_lock(&current->files->file_lock);
    retval = max_select_fd(n, fds);//要求对应的fd必须打开,并返回最大的fd
    spin_unlock(&current->files->file_lock);

    if (retval < 0)
        return retval;
    n = retval;
    //设置回调函数,该回调函数将当前进程放到等待队列table中
    poll_initwait(&table);//初始化table

    wait = &table.pt;
    if (!__timeout)
        wait = NULL;
    retval = 0;
    for (;;) 
    {
        unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
        //设置当前的进程状态为可中断的等待状态
        set_current_state(TASK_INTERRUPTIBLE);

        inp = fds->in; outp = fds->out; exp = fds->ex;
        rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;

        for (i = 0; i < n; ++rinp, ++routp, ++rexp) 
        {//遍历所有的文件描述符
            unsigned long in, out, ex, all_bits, bit = 1, mask, j;
            unsigned long res_in = 0, res_out = 0, res_ex = 0;
            struct file_operations *f_op = NULL;
            struct file *file = NULL;

            in = *inp++; out = *outp++; ex = *exp++;
            all_bits = in | out | ex;//把可读、可写、异常事件一起做逻辑或运算
            if (all_bits == 0) {//如果all_bits为0,就说明这个文件描述符上没有事件发生,直接执行下次循环
                i += __NFDBITS;
                continue;
            }
            //如果有事件发生,但是因为不知道是可读、可写,还是异常事件,所以循环遍历所有的(32个)bit位,每次将1左移一位和all_bits做逻辑与运算
            for (j = 0; j < __NFDBITS; ++j, ++i, bit <<= 1) 
            {
                if (i >= n)
                    break;
                if (!(bit & all_bits))//和all_bits做逻辑与运算,如果为0,continue,遍历下一位,如果不为0则说明这个位对应的fd有感兴趣的事件发生,
                    continue;
                file = fget(i);
                if (file) 
                {
                    f_op = file->f_op;
                    mask = DEFAULT_POLLMASK;
                    if (f_op && f_op->poll)

                    mask = (*f_op->poll)(file, retval ? NULL : wait);//调用回调函数poll,poll是一个函数指针,将当前进程挂上等待队列,poll执行成功后唤醒这个进程
                    fput(file);
                    //然后拿这个bit分别去和用户传进来的in,out,ex做逻辑与运算,判断是可读,可写还是异常事件发生了                   
                    if ((mask & POLLIN_SET) && (in & bit))                          {//表示可读
                        res_in |= bit;//同时设置返回位数组
                        retval++;//retval是in, out, ex这三个集合的总数目要返回select的总数目,计数器++
                    }
                    if ((mask & POLLOUT_SET) && (out & bit)) {//表示可写
                        res_out |= bit;
                        retval++;
                    }
                    if ((mask & POLLEX_SET) && (ex & bit)) {//表示异常
                        res_ex |= bit;
                        retval++;
                    }
                }
                //如果有必要,就重新调度进程
                cond_resched();
            }
            //将返回事件位数组的值拷贝给传进来的参数
            if (res_in)
                *rinp = res_in;
            if (res_out)
                *routp = res_out;
            if (res_ex)
                *rexp = res_ex;
        }
        //遍历完后,检查retval,看是否有可读、可写、异常事件,有则retval不为0,退出死循环
        wait = NULL;
        if (retval || !__timeout || signal_pending(current))//retval不为0,break
            break;
        if(table.error) {
            retval = table.error;
            break;
        }

        //schedule_timeout是让CPU在指定时间用完以后或者接受一个信号量时,该进程才可以运行 
        __timeout = schedule_timeout(__timeout);
    }
    __set_current_state(TASK_RUNNING);
    //清理等待队列
    poll_freewait(&table);

    /*
     * Up-to-date the caller timeout.
     */
    *timeout = __timeout;
    return retval;
}
2015-03-20 13:35:34 vonzhoufz 阅读数 5864
  • sql server 2008 数据库基础应用与开发教程

    本课程全面介绍了sqlserver2008系统的体系架构和功能。本课程以章节的形式,共13章,逐步学习,内容包括sqlserver2008基础知识、t-sql语言、数据库和表、select查询、视图、索引和游标、存储过程与触发器、数据库的备份与还原、安全与权限等。

    5895 人正在学习 去看看 徐新帅
Linux select 机制深入分析
     
     作为IO复用的实现方式,select是提高了抽象和batch处理的级别,不是传统方式那样阻塞在真正IO读写的系统调用上,而是阻塞在select系统调用上,等待我们关注的描述符就绪。当然现在更好的方式是epoll,比如Java中的NIO底层就是用的epoll。这篇文章只是为了搞懂select机制的原理,不看源码就不能说懂这些IO复用手法。也在面试过程中体会到了,不去实践就会发现知道的永远是皮毛。面试问题:select的最大描述符限制可以修改吗?(有待深入)

用户层API语法:
 /* According to POSIX.1-2001 */
       #include <sys/select.h>

       /* According to earlier standards */
       #include <sys/time.h>
       #include <sys/types.h>
       #include <unistd.h>

       int select(int nfds, fd_set *readfds, fd_set *writefds,
                  fd_set *exceptfds, struct timeval *timeout);

       void FD_CLR(int fd, fd_set *set);
       int  FD_ISSET(int fd, fd_set *set);
       void FD_SET(int fd, fd_set *set);
       void FD_ZERO(fd_set *set);

       #include <sys/select.h>
      int pselect(int nfds, fd_set *readfds, fd_set *writefds,
                   fd_set *exceptfds, const struct timespec *timeout,
                   const sigset_t *sigmask);

这里的API发生了变化(参见UNPv1 P127),timeout值是允许更新的,这在内核中有体现。

select系统调用的内核源码主要流程是:sys_select() -> core_sys_select() -> do_select() -> poll_select_copy_remaining。
可代码可以一目了然。

/*
* SYSCALL_DEFINE5宏的作用就是将其转成系统调用的常见形式,
* asmlinkage long sys_select(int n, fd_set __user *inp, fd_set __user *outp,fd_set __user *exp, struct timeval __user *tvp);
*/
SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp,
          fd_set __user *, exp, struct timeval __user *, tvp)
{
     struct timespec end_time, *to = NULL;
     struct timeval tv;
     int ret;

     if (tvp) {//如果设置了超时阈值
          if (copy_from_user(&tv, tvp, sizeof(tv)))
               return -EFAULT;

          to = &end_time;
          // 从timeval(秒 微秒)转换为(秒 纳秒)  继而建立超时
          if (poll_select_set_timeout(to,
                    tv.tv_sec + (tv.tv_usec / USEC_PER_SEC),
                    (tv.tv_usec % USEC_PER_SEC) * NSEC_PER_USEC))
               return -EINVAL;
     }
     // 核心工作
     ret = core_sys_select(n, inp, outp, exp, to);
     //core_sys_select处理的fd_set 接下来更新timeout的值
     ret = poll_select_copy_remaining(&end_time, tvp, 1, ret);

     return ret;
}

/*
* We can actually return ERESTARTSYS instead of EINTR, but I'd
* like to be certain this leads to no problems. So I return
* EINTR just for safety.
*
* Update: ERESTARTSYS breaks at least the xview clock binary, so
* I'm trying ERESTARTNOHAND which restart only when you want to.
*/
int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp,
                  fd_set __user *exp, struct timespec *end_time)
{
     // poll.h :fd_set_bits包装了6个long *,代表三个描述表集的值-结果
     fd_set_bits fds;
     void *bits;
     int ret, max_fds;
     unsigned int size;
     struct fdtable *fdt;
     /* Allocate small arguments on the stack to save memory and be faster 
     * 先是预分配256B的空间  大多数情况下能够满足需要 特殊情况在下面会分配空间
     */
     long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];

     ret = -EINVAL;
     if (n < 0)
          goto out_nofds;

     /* max_fds can increase, so grab it once to avoid race */
     rcu_read_lock();
     // 获得打开文件描述符表(指针析取)
     fdt = files_fdtable(current->files);
     max_fds = fdt->max_fds;
     rcu_read_unlock();
     if (n > max_fds)
          n = max_fds;//参数修正

     /*
     * 现在要监视的描述符个数个size*8个对于每一个都需要6个位来标示
     * 它是否可以读写异常并且把结果写在res_in res_out res_exp中
     * 所以构成了下面的内存布局(见图1)
     */
     size = FDS_BYTES(n);
     bits = stack_fds;
     if (size > sizeof(stack_fds) / 6) {
          /* Not enough space in on-stack array; must use kmalloc */
          ret = -ENOMEM;
          bits = kmalloc(6 * size, GFP_KERNEL);
          if (!bits)
               goto out_nofds;
     }
     fds.in      = bits;
     fds.out     = bits +   size;
     fds.ex      = bits + 2*size;
     fds.res_in  = bits + 3*size;
     fds.res_out = bits + 4*size;
     fds.res_ex  = bits + 5*size;

     // 从用户空间得到这些fd sets 
     if ((ret = get_fd_set(n, inp, fds.in)) ||
         (ret = get_fd_set(n, outp, fds.out)) ||
         (ret = get_fd_set(n, exp, fds.ex)))
          goto out;
     // 初始化这些结果参数为0
     zero_fd_set(n, fds.res_in);
     zero_fd_set(n, fds.res_out);
     zero_fd_set(n, fds.res_ex);

     // 到这里 一切准备工作都就绪了.....
     ret = do_select(n, &fds, end_time);

     if (ret < 0)
          goto out;
     if (!ret) {
          ret = -ERESTARTNOHAND;
          if (signal_pending(current))
               goto out;
          ret = 0;
     }
     // do_select正确返回后 通过copy_to_user将fds中的描述符就绪结果参数
     // 反馈到用户空间
     if (set_fd_set(n, inp, fds.res_in) ||
         set_fd_set(n, outp, fds.res_out) ||
         set_fd_set(n, exp, fds.res_ex))
          ret = -EFAULT;

out:
     if (bits != stack_fds)
          kfree(bits);
out_nofds:
     return ret;
}

// select 的核心工作
int do_select(int n, fd_set_bits *fds, struct timespec *end_time)
{
     ktime_t expire, *to = NULL;
     struct poll_wqueues table;
     poll_table *wait;
     int retval, i, timed_out = 0;
     unsigned long slack = 0;
     unsigned int busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0;
     unsigned long busy_end = 0;

     // 得到Select要监测的最大的描述符值
     rcu_read_lock();
     retval = max_select_fd(n, fds);
     rcu_read_unlock();

     if (retval < 0)
          return retval;
     n = retval;

     poll_initwait(&table);
     wait = &table.pt;
     // 定时器值(秒 纳秒)为0的话标示不等待
     if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {
          wait->_qproc = NULL;
          timed_out = 1;
     }

     
     if (end_time && !timed_out)
          slack = select_estimate_accuracy(end_time);

     // 下面会用到这个变量统计就绪的描述符个数 所以先清0
     retval = 0;
     for (;;) {
          unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
          bool can_busy_loop = false;

          inp = fds->in; outp = fds->out; exp = fds->ex;
          rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;

          for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
               unsigned long in, out, ex, all_bits, bit = 1, mask, j;
               unsigned long res_in = 0, res_out = 0, res_ex = 0;

               in = *inp++; out = *outp++; ex = *exp++;
               all_bits = in | out | ex;
               // 要一次轮询这些这些位图 定位到某个有我们关心的fd的区间
               // 否则以32bits步长前进
               if (all_bits == 0) {
                    i += BITS_PER_LONG;
                    continue;
               }
               // 当前这个区间有我们关心的fd 所以深入细节追踪(图2)
               for (j = 0; j < BITS_PER_LONG; ++j, ++i, bit <<= 1) {
                    struct fd f;
                    if (i >= n)
                         break;
                    if (!(bit & all_bits))
                         continue;
                    // 如果发现了当前区间的某一个bit为1 则说明对应的fd需要我们处理 
                    // 此时此刻的i正是文件描述符值
                    f = fdget(i);
                    if (f.file) {
                         const struct file_operations *f_op;
                         f_op = f.file->f_op;
                         mask = DEFAULT_POLLMASK;
                         //具体到文件操作结果中的poll函数指针  对于
                         if (f_op->poll) {
                              wait_key_set(wait, in, out,
                                        bit, busy_flag);
                              mask = (*f_op->poll)(f.file, wait);// TODO
                         }
                         // 上面的fdget增加了file引用计数 所以这里恢复
                         fdput(f);
                         /* 判断关注的描述符是否就绪 就绪的话就更新到结果参数中
                         * 并且增加就绪个数
                         */
                         if ((mask & POLLIN_SET) && (in & bit)) {
                              res_in |= bit;
                              retval++;
                              wait->_qproc = NULL;
                         }
                         if ((mask & POLLOUT_SET) && (out & bit)) {
                              res_out |= bit;
                              retval++;
                              wait->_qproc = NULL;
                         }
                         if ((mask & POLLEX_SET) && (ex & bit)) {
                              res_ex |= bit;
                              retval++;
                              wait->_qproc = NULL;
                         }
                         /* got something, stop busy polling 
                         * 停止忙循环 
                         */
                         if (retval) {
                              can_busy_loop = false;
                              busy_flag = 0;

                         /*
                         * only remember a returned
                         * POLL_BUSY_LOOP if we asked for it
                         */
                         } else if (busy_flag & mask)
                              can_busy_loop = true;

                    }
               }
               // 这一轮的区间遍历完之后 更新结果参数
               if (res_in)
                    *rinp = res_in;
               if (res_out)
                    *routp = res_out;
               if (res_ex)
                    *rexp = res_ex;
               /* 进行一次调度 允许其他进程运行
               * 后面有等待队列唤醒
               */
               cond_resched();
          }
          // 一轮轮询之后
          wait->_qproc = NULL;
          // 如果有描述符就绪 或者设置了超时 或者有待处理信号 则退出这个死循环
          if (retval || timed_out || signal_pending(current))
               break;
          if (table.error) {
               retval = table.error;
               break;
          }

          /* only if found POLL_BUSY_LOOP sockets && not out of time */
          if (can_busy_loop && !need_resched()) {
               if (!busy_end) {
                    busy_end = busy_loop_end_time();
                    continue;
               }
               if (!busy_loop_timeout(busy_end))
                    continue;
          }
          busy_flag = 0;

          /* 如果设置超时 并且这是首次循环(to==NULL) */
          if (end_time && !to) {
               // 从timespec转化为ktime类型(64位的有符号值)
               expire = timespec_to_ktime(*end_time);
               to = &expire;
          }
          /*设置该进程状态TASK_INTERRUPTIBLE 睡眠直到超时
          * 返回到这里后进程 TASK_RUNNING
          */
          if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE, to, slack))
               timed_out = 1;
     }

     // 释放该poll wait queue
     poll_freewait(&table);

     return retval;
}

附图1:


附图2:




参考:
(1)Linux kernel 3.18 source code 
(2)Linux man page
(3)UNPv1
耗时:3h



select源码剖析

阅读数 369

没有更多推荐了,返回首页