2019-07-10 10:59:36 weixin_42462202 阅读数 1447
  • sql server 2008 数据库基础应用与开发教程

    本课程全面介绍了sqlserver2008系统的体系架构和功能。本课程以章节的形式,共13章,逐步学习,内容包括sqlserver2008基础知识、t-sql语言、数据库和表、select查询、视图、索引和游标、存储过程与触发器、数据库的备份与还原、安全与权限等。

    4475 人正在学习 去看看 徐新帅

IO多路复用接口Linux内核源码剖析,源码之前,了无秘密

Linux poll内核源码剖析

Linux select内核源码剖析

Linux epoll内核源码剖析

Linux select内核源码剖析


select的原理其实是和poll是一样的,都是采用轮询的方式。select相对于poll也许是比较节省空间吧,因为select是采用bitmap来标志的

本文先讲解一下如何在应用层使用select,然后再深入内核剖析select机制

select应用程序

select可以监听多个文件描述符,直到条件满足或者超时返回

  • select函数原型

    int select(int nfds, fd_set *readfds, fd_set *writefds,
               fd_set *exceptfds, struct timeval *timeout);
    

    nfds:最大的文件描述符加1

    readfds:监听可读集合

    writefds:监听可写集合

    exceptfds:监听异常集合

    timeout:超时时间

    对于这些集合,有一组函数可以进行设置

    void FD_SET(int fd, fd_set *set); //设置文件描述符到集合中
    void FD_CLR(int fd, fd_set *set); //从集合中清除文件描述符标志
    int  FD_ISSET(int fd, fd_set *set); //判断文件描述符在集合中是否被标志
    void FD_ZERO(fd_set *set); //清空集合
    

demo

下面这个程序使用select监听标准输入,直到标准输入可读时,返回并打印内容

#include <stdio.h>
#include <sys/select.h>

int main(int argc, char* argv[])
{
    fd_set rfds;
    int nfds;
    int i;
    char buf[1024];
    int len;

    FD_ZERO(&rfds); //清空集合

    FD_SET(0, &rfds); //标准输入
    nfds = 0 + 1; //最大文件描述符加1

    while(1)
    {
        fd_set fds = rfds;
        
        /* 开始监听 */
        if(select(nfds, &fds, NULL, NULL, NULL) < 0)
        {
            printf("select err.\n");
            return -1;
        }

        for(i = 0; i < nfds; i++)
        {
            /* 判断是否满足条件 */
            if(FD_ISSET(i, &fds))
            {
                len = read(i, buf, 1024);
                buf[len] = '\0';
                printf("read buf: %s\n", buf);
            }
        }
    }

    return 0;
}

select机制内核源码剖析

我们先来看看fd_set是什么东西

typedef __kernel_fd_set		fd_set;

typedef struct {
	unsigned long fds_bits [__FDSET_LONGS]; //定义一个数组
} __kernel_fd_set;

从上面可以看出,fd_set其实就是一个数组,内核用一个位来表示一个文件描述符,从内核定义来看,一共有1024个位

下面再来看看这四个设置函数

void FD_SET(int fd, fd_set *set); //设置文件描述符到集合中
void FD_CLR(int fd, fd_set *set); //从集合中清除文件描述符标志
int  FD_ISSET(int fd, fd_set *set); //判断文件描述符在集合中是否被标志
void FD_ZERO(fd_set *set); //清空集合
#define FD_SET(fd,fdsetp)	__FD_SET(fd,fdsetp)
#define FD_CLR(fd,fdsetp)	__FD_CLR(fd,fdsetp)
#define FD_ISSET(fd,fdsetp)	__FD_ISSET(fd,fdsetp)
#define FD_ZERO(fdsetp)		__FD_ZERO(fdsetp)

先看FD_SET,其实就是将特定的位置1

#define __FD_SET(fd, fdsetp) \
		(((fd_set *)(fdsetp))->fds_bits[(fd) >> 5] |= (1<<((fd) & 31)))

再看看FD_CLR,其实就是将特定的位置0

#define __FD_CLR(fd, fdsetp) \
		(((fd_set *)(fdsetp))->fds_bits[(fd) >> 5] &= ~(1<<((fd) & 31)))

看看FD_ISSET,其实就是判断特定的位是否被置1

#define __FD_ISSET(fd, fdsetp) \
		((((fd_set *)(fdsetp))->fds_bits[(fd) >> 5] & (1<<((fd) & 31))) != 0)

看一下FD_ZERO,其实就是将所有的位置0

#define __FD_ZERO(fdsetp) \
		(memset (fdsetp, 0, sizeof (*(fd_set *)(fdsetp))))

至此我们直到,fd_set其实就是一个数组,然后里面每一个位都表示一个文件描述符的状态,我们将我们要监听的文件描述符对应的位标志好后,传递给内核,内核会将状态通过位标记返回到应用层

下面就马上来分析select对应的系统调用

select对应的系统调用如下

SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp,
		fd_set __user *, exp, struct timeval __user *, tvp)

将其展开后得到如下函数

long sys_select(int n, fd_set __user * inp, fd_set __user * outp,
                    fd_set __user * exp, struct timeval __user * tvp)
SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp,
		fd_set __user *, exp, struct timeval __user *, tvp)
{
    /* 从应用层会传递过来三个需要监听的集合,可读,可写,异常 */
    ret = core_sys_select(n, inp, outp, exp, to);
    
    return ret;
}

接下来看core_sys_select

int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp,
			   fd_set __user *exp, struct timespec *end_time)
{
    /* 在栈上分配一段内存 */
    long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];
    
    size = FDS_BYTES(n); //n个文件描述符需要多少个字节
    
    /* 
     * 如果栈上的内存太小,那么就重新分配内存
     * 为什么是除以6呢?
     * 因为每个文件描述符要占6个bit(输入:可读,可写,异常;输出结果:可读,可写,异常)
     */
    if (size > sizeof(stack_fds) / 6)
		bits = kmalloc(6 * size, GFP_KERNEL);
    
    /* 设置好bitmap对应的内存空间 */
    fds.in      = bits; //可读
	fds.out     = bits +   size; //可写
	fds.ex      = bits + 2*size; //异常
	fds.res_in  = bits + 3*size; //返回结果,可读
	fds.res_out = bits + 4*size; //返回结果,可写
	fds.res_ex  = bits + 5*size; //返回结果,异常
    
    /* 将应用层的监听集合拷贝到内核空间 */
    get_fd_set(n, inp, fds.in);
    get_fd_set(n, outp, fds.out);
    get_fd_set(n, exp, fds.ex);
    
    /* 清空三个输出结果的集合 */
	zero_fd_set(n, fds.res_in);
	zero_fd_set(n, fds.res_out);
	zero_fd_set(n, fds.res_ex);
    
    /* 调用do_select阻塞,满足条件时返回 */
    ret = do_select(n, &fds, end_time);
    
    /* 将结果拷贝回应用层 */
    set_fd_set(n, inp, fds.res_in);
    set_fd_set(n, outp, fds.res_out);
    set_fd_set(n, exp, fds.res_ex);
    
    return ret;
}

下面来看一看do_select函数

int do_select(int n, fd_set_bits *fds, struct timespec *end_time)
{
    for (;;) {
        /* 遍历所有监听的文件描述符 */
    	for (i = 0; i < n; ++rinp, ++routp, ++rexp)
        {
            for (j = 0; j < __NFDBITS; ++j, ++i, bit <<= 1)
            {
                /* 调用每一个文件描述符对应驱动的poll函数,得到一个掩码 */
                mask = (*f_op->poll)(file, wait);
                
                /* 根据掩码设置相应的bit */
                if ((mask & POLLIN_SET) && (in & bit)) {
                    res_in |= bit;
                    retval++;
                }
                
                if ((mask & POLLOUT_SET) && (out & bit)) {
                    res_out |= bit;
                    retval++;
                }
                
                if ((mask & POLLEX_SET) && (ex & bit)) {
                    res_ex |= bit;
                    retval++;
                }
            }
        }
    
        /* 如果条件满足,则退出 */
        if (retval || timed_out || signal_pending(current))
            break;
        
        /* 调度,进程睡眠 */
        poll_schedule_timeout(&table, TASK_INTERRUPTIBLE, to, slack);
    }
}

do_select会遍历所有要监听的文件描述符,调用对应驱动程序的poll函数,驱动程序的poll一般实现如下

static unsigned int button_poll(struct file *fp, poll_table * wait)
{
	unsigned int mask = 0;

    /* 调用poll_wait */
	poll_wait(fp, &wq, wait); //wq为自己定义的一个等待队列头

	/* 如果条件满足,返回相应的掩码 */
	if(condition)
		mask |= POLLIN; 

	return mask;
}

看看poll_wait做了什么

static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
	if (p && wait_address)
		p->qproc(filp, wait_address, p);
}

p->qproc在之前又被初始化为__pollwait

static void __pollwait(struct file *filp, wait_queue_head_t *wait_address,
				poll_table *p)
{
    /* 分配一个结构体 */
    struct poll_table_entry *entry = poll_get_entry(pwq);
    
    /* 将等待队列元素加入驱动程序的等待队列头中 */
	add_wait_queue(wait_address, &entry->wait); 
}

由此可知,do_select中对每一个文件描述符调用(*f_op->poll)(file, wait),是为每一个文件描述符申请一个等待队列元素,然后将其添加到对应驱动程序的等待队列中,等待条件满足时唤醒

我们再回到do_select函数里,第一遍遍历调用``(*f_op->poll)(file, wait)`是为了为每一个文件描述符申请一个等待队列元素,将其添加到对应驱动程序的等待队列中,然后会睡眠等待,当有条件满足时,对应的驱动会通过等待队列唤醒该进程,然后进行第二次遍历,此时得到一个掩码,然后设置好每一个文件描述符状态,退出

至此,select也就分析完了

总结

select和poll的实现原理是一样的,只是select采用bitmap的方式来标记文件描述符,然后select 最不能忍受的是一个进程所打开的FD是有一定限制的,由FD_SETSIZE设置,默认值是1024

select会有两次遍历所有监听的文件描述符,第一次是将等待队列元素添加到对应驱动程序的等待队列中,然后调度,睡眠等待唤醒,第二次是当条件满足时,驱动程序会唤醒等待队列,然后select会进行第二次遍历,获取一个掩码,设置好每一个文件描述符的bitmap,然后再将结果拷贝回用户空间

2018-05-21 20:58:52 Function_Dou 阅读数 979
  • sql server 2008 数据库基础应用与开发教程

    本课程全面介绍了sqlserver2008系统的体系架构和功能。本课程以章节的形式,共13章,逐步学习,内容包括sqlserver2008基础知识、t-sql语言、数据库和表、select查询、视图、索引和游标、存储过程与触发器、数据库的备份与还原、安全与权限等。

    4475 人正在学习 去看看 徐新帅

select源码分析


select()

函数是从SYSCALL_DEFINE5(select, ...)开始. 可以简单的将SYSCALL_DEFINEx理解为系统定义的系统函数, 如果想了解 SYSCALL_DEFINE 可以看一下.

具体的执行流程是 :

  • 将时间定义从用户空间复制到内核空间中, 进行时间片的设置, 如果为0或不合法就设置为默认值, 否则就设置为传入的时间.
  • 调用core_sys_select函数, 以实现等待消息到来, 轮询等主要操作
  • 调用timeval_compare返回执行完剩余的时间
  • 最后将返回的时间使用copy_to_user复制到用户空间
SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp, fd_set __user *, exp, struct timeval __user *, tvp)
{
    s64 timeout = -1;
    struct timeval tv;
    int ret;

    if (tvp)
    {
        // 将数据从用户空间拷贝到内核空间的tv中
        if (copy_from_user(&tv, tvp, sizeof(tv)))
            return -EFAULT;
        ...
    }

    // 设置 fds 结构的参数并且等待消息的到来, 或者时间片没有结束调度程序
    ret = core_sys_select(n, inp, outp, exp, &timeout);

    // 设置时间片
    if (tvp)
    {
        ...
        // 返回执行完后剩余时间
        if (timeval_compare(&rtv, &tv) >= 0)
            rtv = tv;
        if (copy_to_user(tvp, &rtv, sizeof(rtv))) 
        ...
    }
    return ret;
}

core_sys_select

因为select的主要功能都是do_select函数, 这里我们先分析一下关于core_sys_select函数.

  • 获取文件文件描述符表并存放在fdtable
  • fdtable读, 写, 错误分配空间, 并初始化
  • 将select传入的readfds,writefds, errorfds参数从用户空间复制到内核空间的fdtable对应的读, 写, 错误中. 这里需要解释一下, fdselect主要是保存之后要将来的信号返回给用户空间的.
  • 调用do_select, 轮询等待消息的到来, 并且将消息的文件描述符保存在fds结构体中
  • fds保存的读, 写, 错误集合从内核空间复制到用户空间
// 参数满足 int select(int maxfdp,fd_set *readfds,fd_set *writefds,fd_set *errorfds,struct timeval*timeout); 
// typedef __kernel_fd_set fd_set
int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp,fd_set __user *exp, s64 *timeout)
{
    fd_set_bits fds;
    void *bits;
    int ret, max_fds;
    unsigned int size;
    struct fdtable *fdt;
    /*
    #define FRONTEND_STACK_ALLOC    256
    #define SELECT_STACK_ALLOC  FRONTEND_STACK_ALLOC
    */ 
    // 计算出来有32位, 所以数组的大小也定义的是32
    long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];
    // 文件描述符表
    fdt = files_fdtable(current->files);
    ...
    fds.in      = bits;
    fds.out     = bits +   size;
    fds.ex      = bits + 2*size;
    fds.res_in  = bits + 3*size;
    fds.res_out = bits + 4*size;
    fds.res_ex  = bits + 5*size;
    // 实现将数据用户空间复制到内核空间中, 这是是把数据从inp复制到内核的fds空间中
    if ((ret = get_fd_set(n, inp, fds.in)) ||
        (ret = get_fd_set(n, outp, fds.out)) ||
        (ret = get_fd_set(n, exp, fds.ex)))
        goto out;
    // res初始化为0
    zero_fd_set(n, fds.res_in);
    zero_fd_set(n, fds.res_out);
    zero_fd_set(n, fds.res_ex);
    // 调用do_select函数, 检验, 等待消息到来, 并且do_select函数会把到来消息的文件描述符存放在fds结构体中
    ret = do_select(n, &fds, timeout);
    ...
    // 实现将数据从内核空间复制到内核空间中
    if (set_fd_set(n, inp, fds.res_in) ||
        set_fd_set(n, outp, fds.res_out) ||
        set_fd_set(n, exp, fds.res_ex))
        ret = -EFAULT;
    ...
    return ret;
}

在这里可以看出来core_sys_select函数也是只是分配空间, 复制到内核中, 我们还没有看到阻塞等待消息的代码, 所以重要的还是在do_select函数中. 可以看到在最后core_sys_select调用了do_select函数, 等待消息的到来.

关于stack_fds这个数组, 要存放的是6个位图, 分别对应用户态传入的存放监听读、写、异常三个操作的文件描述符集合,以及这三个操作在select执行过后需要返回的三个集合。这是 select 的机制,每次执行 select() 之后,函数把“就绪”的文件描述符留下,返回。下一次,再次执行 select() 时,需要重新把需要监听的文件描述符传入。

do_select

不过分析函数前, 先看看三个待会会用到的宏定义. POLLIN_SET是检查输入消息, 同理POLLOUT_SET检查输出, POLLEX_SET检查错误.

#define POLLIN_SET (POLLRDNORM | POLLRDBAND | POLLIN | POLLHUP | POLLERR)
#define POLLOUT_SET (POLLWRBAND | POLLWRNORM | POLLOUT | POLLERR)
#define POLLEX_SET (POLLPRI)

好了, 现在可以开始分析do_select源码.

  • 获得系统能支持的最大文件描述符
  • 调用poll_initwait函数设置回调消息到来时的回调函数
  • 轮询, 不断的检查链表中有没有消息到来
  • 没有消息就直接启动调度程序, 等待一定的时间片返回进程重复判断消息到来
  • 有消息到来, 或者设置的时间到达后, 退出轮询, 判断消息的类型(输入, 输出, 还是错误), 并保存其消息的文件描述符
  • 将进程设置为运行态, 清除集合, 返回
int do_select(int n, fd_set_bits *fds, s64 *timeout)
{
    struct poll_wqueues table;
    poll_table *wait;
    int retval, i;
    ...
    // 获取最大文件描述符
    retval = max_select_fd(n, fds);
    ...
    n = retval;
    // 初始化等待队列, 并且设置回调函数, 有就绪文件描述符是就执行回调
    poll_initwait(&table);
    wait = &table.pt;
    if (!*timeout)
        wait = NULL;
    retval = 0;
    for (;;)
    {
        unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
        long __timeout;
        // 进程设置为可中断的
        set_current_state(TASK_INTERRUPTIBLE);
        // select的三个参数, 读, 写, 错误. 以及三个返回参数.
        inp = fds->in; outp = fds->out; exp = fds->ex;
        rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;
        // 遍历所有的设置的文件描述符
        for (i = 0; i < n; ++rinp, ++routp, ++rexp) 
        {
            unsigned long in, out, ex, all_bits, bit = 1, mask, j;
            unsigned long res_in = 0, res_out = 0, res_ex = 0;
            const struct file_operations *f_op = NULL;
            struct file *file = NULL;
            in = *inp++; out = *outp++; ex = *exp++;
            // 确定该位有设置好等待返回的进程描述符, 如果没有参数继续自增, 进程等待, 有设置好的描述符, 就执行下一个for循环
            all_bits = in | out | ex;
            if (all_bits == 0)
            {
                i += __NFDBITS;
                continue;
            }

            // 每次遍历一位, 也就是遍历一个文件描述符
            for (j = 0; j < __NFDBITS; ++j, ++i, bit <<= 1) 
            {
                int fput_needed;
                // 是否超过最大设置的文件描述符
                if (i >= n)
                    break;
                // 判断是哪一位的有消息到来
                if (!(bit & all_bits))
                    continuea;
                // 从文件描述符中获取文件结构体
                file = fget_light(i, &fput_needed);
                if (file) 
                {
                    f_op = file->f_op;
                    mask = DEFAULT_POLLMASK;
                    if (f_op && f_op->poll)
                        // 调用文件所对应的具体方法, 具体操作, 比如是POLLIN操作, 检测了文件是否就绪,而且还把当前进程加入等待队列,如果该文件描述符就绪,则会触发回调,以及唤醒该进程
                        mask = (*f_op->poll)(file, retval ? NULL : wait);
                    fput_light(file, fput_needed);
                    if ((mask & POLLIN_SET) && (in & bit)) // 与掩码mask相与, 判断是否是输入操作, 同时检测是否该文件描述符设置了输入操作
                    {
                        res_in |= bit;  // 返回的res_op设置为POLLIN
                        retval++;
                    }
                    if ((mask & POLLOUT_SET) && (out & bit)) 
                    {
                        res_out |= bit;
                        retval++;
                    }
                    if ((mask & POLLEX_SET) && (ex & bit)) 
                    {
                        res_ex |= bit;
                        retval++;
                    }
                }
            }
            if (res_in)
                *rinp = res_in;
            if (res_out)
                *routp = res_out;
            if (res_ex)
                *rexp = res_ex;
            cond_resched();
        }
        wait = NULL;
        // 如果时间到了或者有事件到来
        if (retval || !*timeout || signal_pending(current))
            break;
        ...

        // 没有消息到来, 并且时间没有结束, 那么就进行调度程序, 等待时间片结束后返回到进程, 重新判断消息是否到来, 重复操作
        __timeout = schedule_timeout(__timeout);
        if (*timeout >= 0)
            *timeout += __timeout;
    }
    // 将进程设置为运行态
    __set_current_state(TASK_RUNNING);
    // 释放页中的所有数据
    // 所以我们在不断轮询的时侯, 每次都会重新为 fds 赋值, 因为每次操作完的时侯都会将传入的 fds 修改, 清除, 所以我们需要重复为 fds 恢复
    poll_freewait(&table);

    return retval;
}

这里也就可以看出select的效率, 时间复杂度居然是O(n3), 随着等待的文件描述符越来越多, 那么等待的时间也就越长, 而且等待的数据也是很有限, 对于一个服务器来说这是在太少了. 剩下的操作已经在注释中写的很清楚了. 希望对读者有用.


总结

我总结了一下函数的调用历程, 这样调理也就更加的清楚了.

这里写图片描述
select()函数首先是将参数时间调用copy_from_user将其从用户空间复制到内核空间中, 然后对时间片进行设置(如果时间<=0 或非法将设置为默认一直等待), 然后调用core_sys_select函数, 分配一个fds的数据结构来保存关于传入参数in, out, ex集合的数据. 接着就调用了函数do_select, 先是在设置的时间段进行等待, 遍历所有传入的文件描述符, 如果有消息到来就直接返回给用户空间, 没有的消息, 就先进行进程调度, 同时设置一个回调时间, 在时间片结束后又回调到该进程, 继续判断消息的到来, 还是没有消息就重复调度, 直到有消息到来. 消息到来后, 先是遍历所有的消息, 确定是集合中的消息. 是文件描述符集合中的消息, 就保存该文件描述符, 退出轮询, 并且将文件集合清除, 只保留一个描述符, 然后将描述符返回到core_sys_select, 然后该函数将返回的描述符从内核空间复制到用户空间, 通过FD_ISSET来进行确认. 最后select将剩余的时间返回.

2013-01-14 22:45:34 huaxi1902 阅读数 854
  • sql server 2008 数据库基础应用与开发教程

    本课程全面介绍了sqlserver2008系统的体系架构和功能。本课程以章节的形式,共13章,逐步学习,内容包括sqlserver2008基础知识、t-sql语言、数据库和表、select查询、视图、索引和游标、存储过程与触发器、数据库的备份与还原、安全与权限等。

    4475 人正在学习 去看看 徐新帅

linux select()函数实现分析
  int select(int n,fd_set * readfds,fd_set * writefds,fd_set * exceptfds,struct timeval * timeout);
select()函数是linux下实现异步I/O的一种机制,最重要的使用场合是高效的网络编程。在这里不谈论select()
的具体用法,而是看看select()机制的内部实现。
  我个人认为把select()看成是建立在linux文件系统的一个库函数更合适,虽然select()的代码位于内核(文件系统部分),但是和内核本身的联系还是很小的。
  我们从函数的定义入手,先看看函数的参数,第一个是一个进程内的文件描述符数组最大下标加1,第二个参数实际是一个无符号长整型数组。

  




  以上的图描述了第一个参数maxfd,和fd_set的作用。file_struct是文件描述符指针数组,0-2分别是输入,输出和出错文件描述符。所以在

你添加第一要监听的描述符的时候,它的下标是3,也就是第四个。select的工作原理是循环的扫描文件描述符,就是从[0,maxfd+1),凡是

fd_set中所对应的位被置 1(最终循环的时候fd_set的结果是后三个参数的合集),select都会调用相应的poll()函数(fd[4]->file->f_op-

>file_operations->poll()),poll是具体文件相关的函数,对于TCP就是tcp_poll().tcp_poll()会返回被监听socket的状态,比如可写或可读

。然后返回相关的状态给应用程序。之后应用程序就可以通过相关的宏来检测文件描述符的状态是否有被改变了。
  上图是select()函数的程序执行流程。从中可以看出,select()是先轮循了所有的描述符后才判断时间的,为了比较好的控制select()在循

环中所花费的时间,循环的次数一定不能太大,这也就是最大循环次数被定为1024的原因。
  
     我们总结下select()所存在的一些问题。
    1.每次调用时要重复地从用户态读入参数。
    fd_set * readfds,fd_set * writefds,fd_set * exceptfds 这三个参数其实都是长整型数组,每次 调用的时候都会从用户空间拷贝

置内存空间。这也是一个比较耗费时间的操作。

    2.每次调用时要重复地扫描文件描述符。
    其实在很多时候,大部分的文件描述符的状态是没有改变的,但是还是进行了相应的poll()操作,这里也浪费了不必要的时间。
   

    3.每次在调用开始时,要把当前进程放入各个文件描述符的等待队列。在调用结束后,又把进程从各个等待  队列中删除。
    对于这点我们可以看看下面的图。task1的开了n个文件。现在它要监听file4-file9,那么操作系统就会把task1连接进要监听的文件的等待

队列中。在阻塞模式下,只要其中一个文件的状态改变,改进程就会被唤醒,投入运行,所以这个时候,又会将该进程从所有的等待对列中移

出。从源代码的执行看,即使是非阻塞模式下,这中情况还是会发生,因为,连入的操作是在文件相关的poll()函数中进行的,而每次循环都

会执行poll().
     针对以上的缺点,linux上引入了epoll.


Linux 2.6.25中的select系统调用 
主要有4个函数: 
sys_select:处理时间参数,调用core_sys_select。 
core_sys_select:处理三个fd_set参数,调用do_select。 
do_select:做select/poll的工作。在合适的时机把自己挂起等待,调用sock_poll。 
sock_poll:用函数指针分派到具体的协议层函数tcp_poll、udp_poll、datagram_poll。 
层层分工明确,我也要多学习这种方式啊。


/*
sys_select(fs/select.c)
处理了超时值(如果有),将struct timeval转换成了时钟周期数,调用core_sys_select,然后检查剩余时间,处理时间
*/
asmlinkage long sys_select(int n, fd_set __user *inp, fd_set __user *outp,
						   fd_set __user *exp, struct timeval __user *tvp)
{
	s64 timeout = -1;
	struct timeval tv;
	int ret;

	if (tvp) {/*如果有超时值*/
		if (copy_from_user(&tv, tvp, sizeof(tv)))
			return -EFAULT;

		if (tv.tv_sec < 0 || tv.tv_usec < 0)/*时间无效*/
			return -EINVAL;

		/* Cast to u64 to make GCC stop complaining */
		if ((u64)tv.tv_sec >= (u64)MAX_INT64_SECONDS)
			timeout = -1;	/* 无限等待*/
		else {
			timeout = DIV_ROUND_UP(tv.tv_usec, USEC_PER_SEC/HZ);
			timeout += tv.tv_sec * HZ;/*计算出超时的相对时间,单位为时钟周期数*/
		}
	}

	/*主要工作都在core_sys_select中做了*/
	ret = core_sys_select(n, inp, outp, exp, &timeout);

	if (tvp) {/*如果有超时值*/
		struct timeval rtv;

		if (current->personality & STICKY_TIMEOUTS)/*模拟bug的一个机制,不详细描述*/
			goto sticky;
		/*rtv中是剩余的时间*/
		rtv.tv_usec = jiffies_to_usecs(do_div((*(u64*)&timeout), HZ));
		rtv.tv_sec = timeout;
		if (timeval_compare(&rtv, &tv) >= 0)/*如果core_sys_select超时返回,更新时间*/
			rtv = tv;
		/*拷贝更新后的时间到用户空间*/
		if (copy_to_user(tvp, &rtv, sizeof(rtv))) {
sticky:
			/*
			* If an application puts its timeval in read-only
			* memory, we don't want the Linux-specific update to
			* the timeval to cause a fault after the select has
			* completed successfully. However, because we're not
			* updating the timeval, we can't restart the system
			* call.
			*/
			if (ret == -ERESTARTNOHAND)/*ERESTARTNOHAND表明,被中断的系统调用*/
				ret = -EINTR;
		}
	}

	return ret;
}






/*core_sys_select
为do_select准备好了位图,然后调用do_select,将返回的结果集,返回到用户空间
*/
static int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp,
						   fd_set __user *exp, s64 *timeout)
{
	fd_set_bits fds;
	void *bits;
	int ret, max_fds;
	unsigned int size;
	struct fdtable *fdt;
	/* Allocate small arguments on the stack to save memory and be faster */

	/*SELECT_STACK_ALLOC 定义为256*/
	long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];

	ret = -EINVAL;
	if (n < 0)
		goto out_nofds;

	/* max_fds can increase, so grab it once to avoid race */
	rcu_read_lock();
	fdt = files_fdtable(current->files);/*获取当前进程的文件描述符表*/
	max_fds = fdt->max_fds;
	rcu_read_unlock();
	if (n > max_fds)/*修正用户传入的第一个参数:fd_set中文件描述符的最大值*/
		n = max_fds;

	/*
	* We need 6 bitmaps (in/out/ex for both incoming and outgoing),
	* since we used fdset we need to allocate memory in units of
	* long-words. 
	*/

	/*
	如果stack_fds数组的大小不能容纳下所有的fd_set,就用kmalloc重新分配一个大数组。
	然后将位图平均分成份,并初始化fds结构
	*/
	size = FDS_BYTES(n);
	bits = stack_fds;
	if (size > sizeof(stack_fds) / 6) {
		/* Not enough space in on-stack array; must use kmalloc */
		ret = -ENOMEM;
		bits = kmalloc(6 * size, GFP_KERNEL);
		if (!bits)
			goto out_nofds;
	}
	fds.in      = bits;
	fds.out     = bits +   size;
	fds.ex      = bits + 2*size;
	fds.res_in  = bits + 3*size;
	fds.res_out = bits + 4*size;
	fds.res_ex  = bits + 5*size;

	/*get_fd_set仅仅调用copy_from_user从用户空间拷贝了fd_set*/
	if ((ret = get_fd_set(n, inp, fds.in)) ||
		(ret = get_fd_set(n, outp, fds.out)) ||
		(ret = get_fd_set(n, exp, fds.ex)))
		goto out;

	zero_fd_set(n, fds.res_in);
	zero_fd_set(n, fds.res_out);
	zero_fd_set(n, fds.res_ex);


	/*
	接力棒传给了do_select
	*/
	ret = do_select(n, &fds, timeout);

	if (ret < 0)
		goto out;

	/*do_select返回,是一种异常状态*/
	if (!ret) {
		/*记得上面的sys_select不?将ERESTARTNOHAND转换成了EINTR并返回。EINTR表明系统调用被中断*/
		ret = -ERESTARTNOHAND;
		if (signal_pending(current))/*当当前进程有信号要处理时,signal_pending返回真,这符合了EINTR的语义*/
			goto out;
		ret = 0;
	}

	/*把结果集,拷贝回用户空间*/
	if (set_fd_set(n, inp, fds.res_in) ||
		set_fd_set(n, outp, fds.res_out) ||
		set_fd_set(n, exp, fds.res_ex))
		ret = -EFAULT;

out:
	if (bits != stack_fds)
		kfree(bits);/*对应上面的kmalloc*/
out_nofds:
	return ret;
}








/*do_select
真正的select在此,遍历了所有的fd,调用对应的xxx_poll函数
*/
int do_select(int n, fd_set_bits *fds, s64 *timeout)
{
	struct poll_wqueues table;
	poll_table *wait;
	int retval, i;

	rcu_read_lock();
	/*根据已经打开fd的位图检查用户打开的fd, 要求对应fd必须打开, 并且返回最大的fd*/
	retval = max_select_fd(n, fds);
	rcu_read_unlock();

	if (retval < 0)
		return retval;
	n = retval;


	/*将当前进程放入自已的等待队列table, 并将该等待队列加入到该测试表wait*/
	poll_initwait(&table);
	wait = &table.pt;

	if (!*timeout)
		wait = NULL;
	retval = 0;

	for (;;) {/*死循环*/
		unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
		long __timeout;

		/*注意:可中断的睡眠状态*/
		set_current_state(TASK_INTERRUPTIBLE);

		inp = fds->in; outp = fds->out; exp = fds->ex;
		rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;


		for (i = 0; i < n; ++rinp, ++routp, ++rexp) {/*遍历所有fd*/
			unsigned long in, out, ex, all_bits, bit = 1, mask, j;
			unsigned long res_in = 0, res_out = 0, res_ex = 0;
			const struct file_operations *f_op = NULL;
			struct file *file = NULL;

			in = *inp++; out = *outp++; ex = *exp++;
			all_bits = in | out | ex;
			if (all_bits == 0) {
				/*
				__NFDBITS定义为(8 * sizeof(unsigned long)),即long的位数。
				因为一个long代表了__NFDBITS位,所以跳到下一个位图i要增加__NFDBITS
				*/
				i += __NFDBITS;
				continue;
			}

			for (j = 0; j < __NFDBITS; ++j, ++i, bit <<= 1) {
				int fput_needed;
				if (i >= n)
					break;

				/*测试每一位*/
				if (!(bit & all_bits))
					continue;

				/*得到file结构指针,并增加引用计数字段f_count*/
				file = fget_light(i, &fput_needed);
				if (file) {
					f_op = file->f_op;
					mask = DEFAULT_POLLMASK;

					/*对于socket描述符,f_op->poll对应的函数是sock_poll
					注意第三个参数是等待队列,在poll成功后会将本进程唤醒执行*/
					if (f_op && f_op->poll)
						mask = (*f_op->poll)(file, retval ? NULL : wait);

					/*释放file结构指针,实际就是减小他的一个引用计数字段f_count*/
					fput_light(file, fput_needed);

					/*根据poll的结果设置状态,要返回select出来的fd数目,所以retval++。
					注意:retval是in out ex三个集合的总和*/
					if ((mask & POLLIN_SET) && (in & bit)) {
						res_in |= bit;
						retval++;
					}
					if ((mask & POLLOUT_SET) && (out & bit)) {
						res_out |= bit;
						retval++;
					}
					if ((mask & POLLEX_SET) && (ex & bit)) {
						res_ex |= bit;
						retval++;
					}
				}

				/*
				注意前面的set_current_state(TASK_INTERRUPTIBLE);
				因为已经进入TASK_INTERRUPTIBLE状态,所以cond_resched回调度其他进程来运行,
				这里的目的纯粹是为了增加一个抢占点。被抢占后,由等待队列机制唤醒。

				在支持抢占式调度的内核中(定义了CONFIG_PREEMPT),cond_resched是空操作
				*/ 
				cond_resched();
			}
			/*根据poll的结果写回到输出位图里*/
			if (res_in)
				*rinp = res_in;
			if (res_out)
				*routp = res_out;
			if (res_ex)
				*rexp = res_ex;
		}
		wait = NULL;
		if (retval || !*timeout || signal_pending(current))/*signal_pending前面说过了*/
			break;
		if(table.error) {
			retval = table.error;
			break;
		}

		if (*timeout < 0) {
			/*无限等待*/
			__timeout = MAX_SCHEDULE_TIMEOUT;
		} else if (unlikely(*timeout >= (s64)MAX_SCHEDULE_TIMEOUT - 1)) {
			/* 时间超过MAX_SCHEDULE_TIMEOUT,即schedule_timeout允许的最大值,用一个循环来不断减少超时值*/
			__timeout = MAX_SCHEDULE_TIMEOUT - 1;
			*timeout -= __timeout;
		} else {
			/*等待一段时间*/
			__timeout = *timeout;
			*timeout = 0;
		}

		/*TASK_INTERRUPTIBLE状态下,调用schedule_timeout的进程会在收到信号后重新得到调度的机会,
		即schedule_timeout返回,并返回剩余的时钟周期数
		*/
		__timeout = schedule_timeout(__timeout);
		if (*timeout >= 0)
			*timeout += __timeout;
	}

	/*设置为运行状态*/
	__set_current_state(TASK_RUNNING);
	/*清理等待队列*/
	poll_freewait(&table);

	return retval;
}


static unsigned int sock_poll(struct file *file, poll_table *wait)
{
	struct socket *sock;

	/*约定socket的file->private_data字段放着对应的socket结构指针*/
	sock = file->private_data;

	/*对应了三个协议的函数tcp_poll,udp_poll,datagram_poll,其中udp_poll几乎直接调用了datagram_poll
	累了,先休息一下,这三个函数以后分析*/
	return sock->ops->poll(file, sock, wait);
}




2019-07-11 09:41:31 weixin_42462202 阅读数 2326
  • sql server 2008 数据库基础应用与开发教程

    本课程全面介绍了sqlserver2008系统的体系架构和功能。本课程以章节的形式,共13章,逐步学习,内容包括sqlserver2008基础知识、t-sql语言、数据库和表、select查询、视图、索引和游标、存储过程与触发器、数据库的备份与还原、安全与权限等。

    4475 人正在学习 去看看 徐新帅

IO多路复用接口Linux内核源码剖析,源码之前,了无秘密

Linux poll内核源码剖析

Linux select内核源码剖析

Linux epoll内核源码剖析

Linux epoll内核源码剖析


前面介绍了select/poll,此文章将讲解epoll,epoll是select/poll的增强版,epoll更加高效,当然也更加复杂

epoll高效的一个重要的原因是在获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就可以

本文将先讲解如何使用epoll,然后再深入Linux内核源码分析epoll机制

epoll应用程序编写

epoll机制提供了三个系统调用epoll_createepoll_ctlepoll_wait

下面是这三个函数的原型

  • epoll_create

    int epoll_create(int size);
    

    此函数返回一个epoll的文件描述符

  • epoll_ctl

    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
    

    此函数是添加、删除、修改事件

    epfd:epoll的文件描述符

    op:表示选项,EPOLL_CTL_ADD(添加事件)、EPOLL_CTL_MOD(修改事件)、EPOLL_CTL_DEL(删除事件)

    fd:要操作的文件描述符

    event:事件信息

  • epoll_wait

    int epoll_wait(int epfd, struct epoll_event *events,
                   int maxevents, int timeout);
    

    此函数是等待条件满足,放回值为准备就绪的事件数

    epfd:epoll的文件描述符

    events:返回的事件信息

    maxevents:要等待的最大事件数

    timeout:超时时间

    • epoll_event

      struct epoll_event
      {
        uint32_t events;		//epoll事件
        epoll_data_t data; 	//联合体,一般表示文件描述符
      };
      

demo

下面这段代码,监听标准输入,当标准输入可读时,就打印读取到的信息

#include <stdio.h>
#include <stdlib.h>
#include <sys/epoll.h>

#define MAX_EVENTS 10

int main(int argc, char* argv[])
{
    struct epoll_event ev, events[MAX_EVENTS];
    int nfds, epollfd, len;
    char buf[1024];
    int n;

    epollfd = epoll_create(10);
    if (epollfd == -1)
    {
        perror("epoll_create");
        exit(-1);
    }

    ev.events = EPOLLIN;
    ev.data.fd = 0;
    if (epoll_ctl(epollfd, EPOLL_CTL_ADD, 0, &ev) == -1)
    {
        perror("epoll_ctl: listen_sock");
        exit(-1);
    }

    while(1)
    {
        nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
        if (nfds == -1) 
        {
            perror("epoll_pwait");
            exit(-1);
        }

        for (n = 0; n < nfds; ++n)
        {
            if(events[n].events & EPOLLIN)
            {
                len = read(events[n].data.fd, buf, 1024);
                buf[len] = '\0';
                printf("fd:%d; read buf:%s\n", events[n].data.fd, buf);
            }
        }
    }

    return 0;
}

epoll机制内核源码剖析

由上面的应用程序可知,epoll机制有三个系统调用,分别为epoll_createepoll_ctlepoll_wait,下面将详细地来分析这三个系统调用

epoll_create

epoll_create对应地系统调用如下

SYSCALL_DEFINE1(epoll_create, int, size)

展开后变成

long sys_epoll_create(int size)

接下来看看epoll_create做了什么事情

SYSCALL_DEFINE1(epoll_create, int, size)
{
    if (size <= 0)
        return -EINVAL;
    
    return sys_epoll_create1(0);
}

由上面的程序可以看出,指定size并没有什么作用,只是判断是否小于等于0而已

下面看一看sys_epoll_create1

SYSCALL_DEFINE1(epoll_create1, int, flags)
{
    struct eventpoll *ep = NULL;
    
    ep_alloc(&ep); //分配内存
    
    /* 定义一个epoll的文件描述符 */
    error = anon_inode_getfd("[eventpoll]", &eventpoll_fops, ep,
                     O_RDWR | (flags & O_CLOEXEC));
    
    /* 返回文件描述符 */
    return error;
}

eventpoll起到管理epoll事件的作用,这个结果贯穿整个epoll机制,下面来看看这个结构体

struct eventpoll {
    /* 等待队列头,被sys_epoll_wait使用 */
	wait_queue_head_t wq;
	
    /* 保准准备就绪的文件描述符的一个链表 */
    struct list_head rdllist;
    
    /* 红黑树节点,epoll使用红黑树存储事件信息 */
    struct rb_root rbr;
    
   	...
};

epoll_create的作用是申请一个eventpoll结构体,申请一个epoll文件描述符,然后放回到用户空间

epoll_ctl

epoll_ctl用于添加,删除,修改事件

对应的系统调用如下

SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
		struct epoll_event __user *, event)

展开后得到

long sys_epoll_ctl(int epfd, int op, int fd, struct epoll_event __user * event)

下面来详细分析

SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
		struct epoll_event __user *, event)
{
    struct eventpoll *ep;
    struct epitem *epi;
    struct epoll_event epds;
    
    copy_from_user(&epds, event, sizeof(struct epoll_event)); //拷贝事件
    
    switch (op) {
    	case EPOLL_CTL_ADD: //添加事件
            ep_insert(ep, &epds, tfile, fd);
            break;
         case EPOLL_CTL_DEL: //删除事件
            ep_remove(ep, epi);
            break;
    	case EPOLL_CTL_MOD: //修改事件
            ep_modify(ep, epi, &epds);
            break;
    }
    
}

在这里主要分析ep_insert添加事件,在分析之前,先将一下epitem,epoll在内核实现的时候,事件是以epitem为单位,保存到红黑树的,下面看一看epitem结构体

struct epitem {
    struct rb_node rbn; //红黑树
    struct list_head rdllink; //就绪链表
    struct eventpoll *ep; //用于指向eventpoll
    struct epoll_event event; //event事件
};

好,接下来分析ep_insert

static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
		     struct file *tfile, int fd)
{
    struct epitem *epi;
    struct ep_pqueue epq; //ep_pqueue结构体见下方
    
    epi = kmem_cache_alloc(epi_cache, GFP_KERNEL); //分配一个epoll项
    epi->ep = ep; //指向所属的eventpoll
    epi->event = *event; //赋值事件
    
    /* 初始化poll_table;pt->qproc = ep_ptable_queue_proc; */
    init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
    
    /* 调用驱动程序的poll函数,具体下面分析 */
    tfile->f_op->poll(tfile, &epq.pt);
    
    /* 添加到红黑树中 */
    ep_rbtree_insert(ep, epi);
}
  • ep_pqueue

    struct ep_pqueue {
    	poll_table pt; //用于调用文件描述符的驱动程序的poll使用
    	struct epitem *epi; //epoll项
    };
    

下面分析上面的tfile->f_op->poll(tfile, &epq.pt)

一般驱动程序的poll实现如下

static unsigned int poll(struct file *fp, poll_table * wait)
{
	unsigned int mask = 0;

    /* 调用poll_wait */
	poll_wait(fp, &wq, wait); //wq为自己定义的一个等待队列头

	/* 如果条件满足,返回相应的掩码 */
	if(condition)
		mask |= POLLIN; 

	return mask;
}

看一看poll_wait什么内容

static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
	if (p && wait_address)
		p->qproc(filp, wait_address, p);
}

调用了p->qproc(filp, wait_address, p)函数,还记得上面程序将其初始化init_poll_funcptr(&epq.pt, ep_ptable_queue_proc)

static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
				 poll_table *pt)
{
    struct epitem *epi = ep_item_from_epqueue(pt); //找到对应的epoll项
    struct eppoll_entry *pwq; //具体定义看下面
    
    pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL)//分配内存
        
    /* 初始化等待队列的唤醒函数,当驱动程序唤醒等待队列时,会调用此函数(ep_poll_callback) */
    init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
    pwq->base = epi; //设置好epoll项
    add_wait_queue(whead, &pwq->wait); //将等待队列元素添加到驱动的等待队列中
}
  • eppoll_entry

    struct eppoll_entry {
    	struct epitem *base; //指向epoll项
        wait_queue_t wait; //等待队列元素
        wait_queue_head_t *whead; //等待队列头
    };
    

回到ep_insert函数,我们可知道tfile->f_op->poll(tfile, &epq.pt)做了什么事情

  • 1、添加等待队列元素到驱动的等待队列中
  • 2、初始化驱动唤醒等待队列时调用的函数,init_waitqueue_func_entry(&pwq->wait, ep_poll_callback)

epoll_wait

epoll_wait对应的系统调用如下

SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
		int, maxevents, int, timeout)

展开后变成

long sys_epoll_wait(int epfd, struct epoll_event __user * events, int maxevents, int timeout)

下面来详细分析

SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
		int, maxevents, int, timeout)
{
	ep_poll(ep, events, maxevents, timeout);
}
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
		   int maxevents, long timeout)
{
	init_waitqueue_entry(&wait, current); //初始化等待队列元素
    __add_wait_queue_exclusive(&ep->wq, &wait); //将等待队列元素添加到event_poll的等待队列中
    
    for (;;) {
    	set_current_state(TASK_INTERRUPTIBLE); //设置当前进程状态
        
        /* 如果就绪链表中有元素或者超时则退出 */
        if (!list_empty(&ep->rdllist) || timed_out)
            break;
    
        /* 任务调度,睡眠 */
    	schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS);
    }
    
    /* 设置进程状态 */
    set_current_state(TASK_RUNNING);

    /* 返回信息到应用层 */
	ep_send_events(ep, events, maxevents));

    /* 返回就绪的事件数 */
	return res;
}

当ep_poll调用schedule_hrtimeout_range的时候,会睡眠等待,直到驱动程序调用wake_up唤醒等待队列时,会再次醒过来,而wake_up的实现如下

...
wait_queue_t *curr;
curr->func(curr, mode, wake_flags, key)
...

会调用等待队列中的等待元素的func函数,epoll添加到驱动程序的等待队列的等待元素的func已经被初始化init_waitqueue_func_entry(&pwq->wait, ep_poll_callback)

下面来分析ep_poll_callback函数,来看看驱动程序是怎么唤醒进程的

static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
    struct epitem *epi = ep_item_from_wait(wait); //得到对应的epoll项
    struct eventpoll *ep = epi->ep; //得到eventpoll
    
    list_add_tail(&epi->rdllink, &ep->rdllist); //将该epoll项添加到eventpoll的就绪链表中
    
    wake_up_locked(&ep->wq); //唤醒eventpoll睡眠的进程
}

唤醒之后,继续回到ep_poll函数运行,此时会判断if (!list_empty(&ep->rdllist) || timed_out)就绪链表不为空,则退出循环,然后调用ep_send_events(ep, events, maxevents)来获取就绪链表中的epoll项的状态,接下来分析ep_send_events(ep, events, maxevents))

static int ep_send_events(struct eventpoll *ep,
			  struct epoll_event __user *events, int maxevents)
{
	struct ep_send_events_data esed;

	esed.maxevents = maxevents;
	esed.events = events;

	return ep_scan_ready_list(ep, ep_send_events_proc, &esed);
}
static int ep_scan_ready_list(struct eventpoll *ep,
			      int (*sproc)(struct eventpoll *,
					   struct list_head *, void *),
			      void *priv)
{
    LIST_HEAD(txlist); //定义一个链表头
    
    list_splice_init(&ep->rdllist, &txlist); //将就绪链表中的元素交换到txlist中
    
    error = (*sproc)(ep, &txlist, priv); //调用回调函数,此回调函数被初始化为ep_send_events_proc
    
    return error; //返回就绪的事件数
}

下面来分析ep_send_events_proc函数

static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
			       void *priv)
{
    struct ep_send_events_data *esed = priv; //包含事件和事件数
    
    unsigned int revents;
    struct epitem *epi;
    struct epoll_event __user *uevent;

    /* 遍历就绪链表 */
    for(...)
    {
        epi = list_first_entry(head, struct epitem, rdllink);
		list_del_init(&epi->rdllink);
        
        /* 调用驱动程序的poll获取状态 */
        revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL) & epi->event.events;
    
        /* 将事件状态拷贝到应用层 */
    	__put_user(revents, &uevent->events);
        __put_user(epi->event.data, &uevent->data);
        
        eventcnt++;
        uevent++;
    }
    
    return eventcnt; //返回事件数
}

到这里可以知道ep_poll函数做了什么事情

  • 1、定义一个等待队列元素,添加到eventpoll的等待队列中,等待唤醒
  • 2、当IO准备就绪时,驱动程序会调用回调函数,将就绪的epoll项添加到就绪链表中,并唤醒eventpoll的等待队列
  • 3、继续运行ep_poll函数,发现就绪链表中有元素,则跳出循环
  • 4、调用ep_send_events函数,调用就绪的epoll项对应驱动程序的poll函数,得到状态,然后再返回到应用层

至此,epoll也就分析完了

总结

epoll是select/poll的增强版,select/epoll是采用轮询的方式,而epoll是通过回调,然后将就绪的IO添加到就绪链表,然后只查询这些就绪的IO状态,从而大大减少不必要的操作,所以在IO数量较多时,epoll的性能优于select/poll

2018-07-07 11:01:04 caogenwangbaoqiang 阅读数 345
  • sql server 2008 数据库基础应用与开发教程

    本课程全面介绍了sqlserver2008系统的体系架构和功能。本课程以章节的形式,共13章,逐步学习,内容包括sqlserver2008基础知识、t-sql语言、数据库和表、select查询、视图、索引和游标、存储过程与触发器、数据库的备份与还原、安全与权限等。

    4475 人正在学习 去看看 徐新帅

1、最终用户空间的系统调用会调用到sys_select函数

asmlinkage long sys_select(int n, fd_set __user *inp, fd_set __user *outp,
            fd_set __user *exp, struct timeval __user *tvp)
{
    s64 timeout = -1;
    struct timeval tv;
    int ret;

    if (tvp) {
        if (copy_from_user(&tv, tvp, sizeof(tv)))//从用户空间拷贝时间
            return -EFAULT;

        if (tv.tv_sec < 0 || tv.tv_usec < 0)
            return -EINVAL;

        /* Cast to u64 to make GCC stop complaining */
        if ((u64)tv.tv_sec >= (u64)MAX_INT64_SECONDS)
            timeout = -1;   /* infinite */
        else {
            timeout = DIV_ROUND_UP(tv.tv_usec, USEC_PER_SEC/HZ);
            timeout += tv.tv_sec * HZ;
        }
    }

    ret = core_sys_select(n, inp, outp, exp, &timeout);//selcet的调用入口

    if (tvp) {
        struct timeval rtv;

        if (current->personality & STICKY_TIMEOUTS)
            goto sticky;
        rtv.tv_usec = jiffies_to_usecs(do_div((*(u64*)&timeout), HZ));
        rtv.tv_sec = timeout;
        if (timeval_compare(&rtv, &tv) >= 0)
            rtv = tv;
        if (copy_to_user(tvp, &rtv, sizeof(rtv))) {
sticky:
            /*
             * If an application puts its timeval in read-only
             * memory, we don't want the Linux-specific update to
             * the timeval to cause a fault after the select has
             * completed successfully. However, because we're not
             * updating the timeval, we can't restart the system
             * call.
             */
            if (ret == -ERESTARTNOHAND)
                ret = -EINTR;
        }
    }

    return ret;
}

2、继续调用core_sys_select

static int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp,
               fd_set __user *exp, s64 *timeout)
{
    fd_set_bits fds;
    void *bits;
    int ret, max_fds;
    unsigned int size;
    struct fdtable *fdt;
    /* Allocate small arguments on the stack to save memory and be faster */
    long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];

    ret = -EINVAL;
    if (n < 0)
        goto out_nofds;

    /* max_fds can increase, so grab it once to avoid race */
    rcu_read_lock();
    fdt = files_fdtable(current->files);
    max_fds = fdt->max_fds;
    rcu_read_unlock();
    if (n > max_fds)
        n = max_fds;

    /*
     * We need 6 bitmaps (in/out/ex for both incoming and outgoing),需要6个位图,分别为输出和输出
     * since we used fdset we need to allocate memory in units of
     * long-words. 
     */
    size = FDS_BYTES(n);//szie大小取决于n,n=1024时,size = 128字节,对应1024位
    bits = stack_fds;//bits = 64;
    if (size > sizeof(stack_fds) / 6) {
        /* Not enough space in on-stack array; must use kmalloc */
        ret = -ENOMEM;
        bits = kmalloc(6 * size, GFP_KERNEL);
        if (!bits)
            goto out_nofds;
    }
    fds.in      = bits;
    fds.out     = bits +   size;
    fds.ex      = bits + 2*size;
    fds.res_in  = bits + 3*size;
    fds.res_out = bits + 4*size;
    fds.res_ex  = bits + 5*size;
    /*-------------------------
      | 128 | 128 | ...| 128 |
      -------------------------
      总共6个size。每个size是32个long类型存贮,因此是1024位,最大监听1024个文件描述符,每一位都代表一个fd
    */

    if ((ret = get_fd_set(n, inp, fds.in)) ||
        (ret = get_fd_set(n, outp, fds.out)) ||
        (ret = get_fd_set(n, exp, fds.ex)))//拷贝用户空间要监听的fd,分别在可写、可读、异常中复制一份
        goto out;
    zero_fd_set(n, fds.res_in);//将输出清零
    zero_fd_set(n, fds.res_out);
    zero_fd_set(n, fds.res_ex);

    ret = do_select(n, &fds, timeout);

    if (ret < 0)
        goto out;
    if (!ret) {
        ret = -ERESTARTNOHAND;
        if (signal_pending(current))
            goto out;
        ret = 0;
    }

    if (set_fd_set(n, inp, fds.res_in) ||//如果都没有设置则返回错误,或者等待时间超时
        set_fd_set(n, outp, fds.res_out) ||
        set_fd_set(n, exp, fds.res_ex))
        ret = -EFAULT;

out:
    if (bits != stack_fds)
        kfree(bits);
out_nofds:
    return ret;
}

3、继续调用do_select

int do_select(int n, fd_set_bits *fds, s64 *timeout)
{
    struct poll_wqueues table;
    poll_table *wait;
    int retval, i;

    rcu_read_lock();
    retval = max_select_fd(n, fds);//fd监听的最大值
    rcu_read_unlock();

    if (retval < 0)
        return retval;
    n = retval;

    poll_initwait(&table);
    wait = &table.pt;
    if (!*timeout)
        wait = NULL;
    retval = 0;
    for (;;) {//死循环
        unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
        long __timeout;

        set_current_state(TASK_INTERRUPTIBLE);//此任务可以被中断

        inp = fds->in; outp = fds->out; exp = fds->ex;
        rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;

        for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
            unsigned long in, out, ex, all_bits, bit = 1, mask, j;
            unsigned long res_in = 0, res_out = 0, res_ex = 0;
            const struct file_operations *f_op = NULL;
            struct file *file = NULL;

            in = *inp++; out = *outp++; ex = *exp++;
            all_bits = in | out | ex;//当前的位置没有置位,则继续下一个位循环,一个位一个位的循环遍历
            if (all_bits == 0) {
                i += __NFDBITS;
                continue;
            }

            for (j = 0; j < __NFDBITS; ++j, ++i, bit <<= 1) {//按照8*long大小轮询
                int fput_needed;
                if (i >= n)
                    break;
                if (!(bit & all_bits))
                    continue;
                file = fget_light(i, &fput_needed);//根据i查找有没有对应的虚拟文件,也就是文件描述符
                if (file) {
                    f_op = file->f_op;//获取处理函数
                    mask = DEFAULT_POLLMASK;
                    if (f_op && f_op->poll)//poll函数指针不为空
                        mask = (*f_op->poll)(file, retval ? NULL : wait);//wait是回调,是否存在事件触发
                    fput_light(file, fput_needed);
                    if ((mask & POLLIN_SET) && (in & bit)) {
                        res_in |= bit;
                        retval++;
                    }
                    if ((mask & POLLOUT_SET) && (out & bit)) {
                        res_out |= bit;
                        retval++;
                    }
                    if ((mask & POLLEX_SET) && (ex & bit)) {
                        res_ex |= bit;
                        retval++;
                    }//根据每一位是否存在可读、可写、异常的设置,分别将对应的事件信息放在结果的数组中
                }
                cond_resched();
            }
            if (res_in)
                *rinp = res_in;
            if (res_out)
                *routp = res_out;
            if (res_ex)
                *rexp = res_ex;
        }
        wait = NULL;
        if (retval || !*timeout || signal_pending(current))
            break;
        if(table.error) {
            retval = table.error;
            break;
        }

        if (*timeout < 0) {
            /* Wait indefinitely */
            __timeout = MAX_SCHEDULE_TIMEOUT;
        } else if (unlikely(*timeout >= (s64)MAX_SCHEDULE_TIMEOUT - 1)) {
            /* Wait for longer than MAX_SCHEDULE_TIMEOUT. Do it in a loop */
            __timeout = MAX_SCHEDULE_TIMEOUT - 1;
            *timeout -= __timeout;
        } else {
            __timeout = *timeout;
            *timeout = 0;
        }
        __timeout = schedule_timeout(__timeout);
        if (*timeout >= 0)
            *timeout += __timeout;
    }
    __set_current_state(TASK_RUNNING);

    poll_freewait(&table);

    return retval;
}

引用两张图
上面的图是说明select在进行监听前是怎样组织数据的,但是每个size不是4个字节,是按照需要监听的最大文件描述符按照有多少个long形式可以满足监听的需求,每一位代表一个文件描述符。加入监听1024个,则需要128字节,需要32个long类型的数据。如果fd为1,则只需要一个long类型的数据,同样要申请6个size的空间。
这里写图片描述
这是select怎样判断是否存在可读、可写、以及异常的事件。都是通过调用回调函数。
尤其注意的是select方式在循环检测的方法,这是select主要的模型方法。空间消耗和时间消耗比较大。
图片的来源:https://blog.csdn.net/leaf_cold/article/details/79452371
再综合看另外一张图:
这里写图片描述
比较清晰的画出了select的调用过程
图片来源:https://blog.csdn.net/zhougb3/article/details/79792089
参考文章:1、select用法&原理详解(源码剖析):https://blog.csdn.net/zhougb3/article/details/79792089
2、select 源码剖析:https://blog.csdn.net/leaf_cold/article/details/79452371

没有更多推荐了,返回首页