精华内容
下载资源
问答
  • epoll编程实例

    一、Epoll模型的三个函数

    int epoll_create(int size);

    作用:创建一个epoll句柄,告诉他需要监听的数目(也可以理解成申请一片空间,用于存放监听的套接字)
    参数一:通知内核监听size个fd,只是个建议值并与硬件有关系。(从 Linux 内核 2.6.8 版本起,size 这个参数就被忽略了,只要求 size 大于 0 即可)
    返回值:返回epoll句柄(fd)

    *int epoll_ctl(int epfd, int op, int fd, struct epoll_event event);

    作用:控制某个epoll监控的文件描述符上的事件:注册,修改、删除(也就是增添 删除 修改一个事件)
    参数一:int epfd:epoll_create()的返回值

    参数二:int op: 表示动作,用三个宏来表示
    EPOLL_CTL_ADD(注册新的fd到epfd)
    EPOLL_CTL_MOD(修改已经注册的fd监听事件)
    EPOLL_CTL_DEL(从epfd删除一个fd)

    参数三:int fd 操作对象(socket)

    参数四:struct epoll_evevt* evevt; 告诉内核需要监听的事件

    结构体如下:
    struct epoll_event {
    __uint32_t events; 宏定义读和写EPOLLIN读EPOLLOUT写
    epoll_data_t data; 联合体
    };
    联合体如下:

    typedef union epoll_data {
    void *ptr;
    int fd;
    __uint32_t u32;
    __uint64_t u64;
    } epoll_data_t;

    返回值:成功返回0,不成功返回-1

    int epoll_wait(int epfd,struct epoll_event * events,int maxevents,int timeout)

    作用:监听红黑树上的事件,将产生动静的事件放在event这个数组内
    参数一:int epfd:epoll_create()函数返回值

    参数二:struct epoll_events* events用于回传代处理事件的数组(也就是存放产生动静的事件)

    参数三:int maxevents 同时最多产生多少事件,告诉内核events有多大,该值必须大于0

    参数四:int timeout表示 -1相当于阻塞,0相当于非阻塞,超时时间(单位:毫秒)

    返回值:成功返回产生动静事件的个数

    二、简单demo

    sever.c

    #include <stdio.h>
    #include <ctype.h>
    #include <unistd.h>
    #include <sys/types.h>
    #include <arpa/inet.h>
    #include <sys/socket.h>
    #include <stdlib.h>
    #include <string.h>
    #include <sys/epoll.h>
    
    #define OPEN_MAX 1024  //最多连接数
    
    int main(int argc, char *argv[])
    {
        int listenfd = socket(AF_INET, SOCK_STREAM, 0);
    
        int on = 1;
        int i;
        int connfd;
        setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));//设置为可重复使用的端口
    
        struct sockaddr_in serv_addr; //服务器的地址结构体
        memset(&serv_addr,0,sizeof(serv_addr));
    
        //设置本服务器要监听的地址和端口,这样客户端才能连接到该地址和端口并发送数据
        serv_addr.sin_family = AF_INET;                //选择协议族为IPV4
        serv_addr.sin_port = htons(9000);         //绑定我们自定义的端口号,客户端程序和我们服务器程序通讯时,就要往这个端口连接和传送数据
        serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); //监听本地所有的IP地址;INADDR_ANY表示的是一个服务器上所有的网卡(服务器可能不止一个网卡)多个本地ip地址都进行绑定端口号,进行侦听。
    
        bind(listenfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));
    
        listen(listenfd, 32);
    
        int efd = epoll_create(OPEN_MAX);
    
        struct epoll_event event, events[OPEN_MAX];
        event.events=EPOLLIN|EPOLLET;
        event.data.fd = listenfd;
    
        epoll_ctl(efd, EPOLL_CTL_ADD, listenfd, &event);//把服务器fd包装成事件放在红黑树上
    
        while(1)
        {
            int nready=epoll_wait(efd,events,OPEN_MAX,-1);//判断为可读事件
            for(i=0; i<nready; i++)
            {
                if(!(events[i].events & EPOLLIN))
                {
                    continue;
                }
    
                if(events[i].data.fd == listenfd)//表示有新的连接
                {
                    struct sockaddr_in client_addr;
                    int len=sizeof(client_addr);
                    memset(&client_addr,0,sizeof(client_addr));
                    connfd = accept(listenfd, (struct sockaddr *)&client_addr, &len);
     				event.events = EPOLLIN|EPOLLET; 
    				event.data.fd = connfd;  
    				epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &event);               
                }
                else//表示旧的数据产生可读事件(1 客户端发来数据 2 客户端断开链接)
                {
         			connfd=events[i].data.fd;
    				char recvline [1024];
    				memset(recvline,0,1024);	
                    int nread=read(connfd,recvline,sizeof(recvline));
    				if(nread==0)
    				{
    					printf("client is close..\n"); //打印
    					epoll_ctl(efd, EPOLL_CTL_DEL, connfd, NULL);//删除果子 select是从集合 和 数组 删除
    					close(connfd);//关闭客服端 select一样
                    }
    				else
    				{
    					printf("%s",recvline);
    				}			
                }
            }
        }
    
        return 0;    
    }
    

    client.c

    #include <stdio.h>
    #include <ctype.h>
    #include <unistd.h>
    #include <sys/types.h>
    #include <arpa/inet.h>
    #include <sys/socket.h>
    #include <stdlib.h>
    #include <string.h>
    
    int main(int argc, char *argv[])
    {
        int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    
        struct sockaddr_in serv_addr;
        memset(&serv_addr, 0, sizeof(serv_addr));
    
        serv_addr.sin_family = AF_INET;
        serv_addr.sin_port = htons(9000);
    
        if(inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr)<=0)
        {
            printf("inet_pton failed exit !\n");
            exit(1);
        }
    
        if(connect(sockfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) < 0)
        {
            printf("connect() failed exit!\n");
            exit(1);        
        }
    
        int len;
        char recvline[1024]; 
        char writeline[1024];
    
        while(1)
        {
            memset(recvline, 0, sizeof(recvline));
            memset(writeline, 0, sizeof(writeline));
    
            //发送消息
    		printf("send to server:");
    		fgets(writeline,sizeof(writeline),stdin);
    		write(sockfd,writeline,strlen(writeline));
    
            len = read(sockfd,recvline,1024);
    
            if(len == 0)
            {
                printf("server is close");
            }
    
            printf("receive from server:%s\n",recvline);
        }
    
        close(sockfd); //关闭套接字
        printf("end exit !\n");
        return 0;     
    
    }
    
    展开全文
  • epoll编程实例

    千次阅读 2009-09-08 09:24:00
     epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);  bzero(&serveraddr, sizeof(serveraddr));  serveraddr.sin_family = AF_INET;  const char *local_addr = "127.0.0.1";  inet_aton...

    #include <iostream>
    #include <sys/socket.h>
    #include <sys/epoll.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>
    #include <fcntl.h>
    #include <unistd.h>
    #include <stdio.h>
    #include <errno.h>
    #include <stdlib.h>
    #include <string.h>
    #include <pthread.h>

    #define MAXLINE 1024
    #define OPEN_MAX 100
    #define LISTENQ 20
    #define SERV_PORT 5555
    #define INFTIM 1000

    //线程池任务队列结构体
    struct task{
        int fd;            //需要读写的文件描述符

        struct task *next; //下一个任务
    };

    //用于保存向客户端发送一次消息所需的相关数据
    struct user_da

    ta{
        int fd;

        unsigned int n_size;

        char line[MAXLINE];
    };
     
    //线程的任务函数
    void * readtask(void *args);

    void * writetask(void *args);

    //声明epoll_event结构体的变量,ev用于注册事件,数组用于回传要处理的事件
    struct epoll_event ev,events[20];

    int epfd;

    pthread_mutex_t mutex;

    pthread_cond_t cond1;

    struct task *readhead=NULL,*readtail=NULL,*writehead=NULL;

    void setnonblocking(int sock)
    {
        int opts;

        opts=fcntl(sock, F_GETFL);

        if(opts<0)
        {
            perror("fcntl(sock,GETFL)");

            exit(1);
        }

        opts = opts | O_NONBLOCK;

        if(fcntl(sock, F_SETFL, opts)<0)
        {
            perror("fcntl(sock,SETFL,opts)");

            exit(1);
        }
    }

    int main()
    {
        int i, maxi, listenfd, connfd, sockfd,nfds;

        pthread_t tid1,tid2;

        struct task *new_task = NULL;

        struct user_data *rdata = NULL;

        socklen_t clilen;

        pthread_mutex_init(&mutex, NULL);

        pthread_cond_init(&cond1, NULL);

        //初始化用于读线程池的线程,开启两个线程来完成任务,两个线程会互斥地访问任务链表
        pthread_create(&tid1, NULL, readtask, NULL);

        pthread_create(&tid2, NULL, readtask, NULL);

        //生成用于处理accept的epoll专用的文件描述符   
        epfd = epoll_create(256);

        struct sockaddr_in clientaddr;

        struct sockaddr_in serveraddr;

        listenfd = socket(AF_INET, SOCK_STREAM, 0);

        //把socket设置为非阻塞方式
        setnonblocking(listenfd);

        //设置与要处理的事件相关的文件描述符
        ev.data.fd = listenfd;

        //设置要处理的事件类型,当描述符可读时出发,出发方式为ET模式
        ev.events = EPOLLIN | EPOLLET;

        //注册epoll事件
        epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);

        bzero(&serveraddr, sizeof(serveraddr));

        serveraddr.sin_family = AF_INET;

        const char *local_addr = "127.0.0.1";

        inet_aton(local_addr, &(serveraddr.sin_addr)); //htons(SERV_PORT);

        serveraddr.sin_port=htons(SERV_PORT);

        bind(listenfd,(sockaddr *)&serveraddr, sizeof(serveraddr));

        //开始监听
        listen(listenfd, LISTENQ);

        maxi = 0;

         for ( ; ; ) {

              //等待epoll事件的发生
              nfds=epoll_wait(epfd, events, 20, 500);

              //处理所发生的所有事件     
            for(i=0; i < nfds; ++i)
            {

            if(events[i].data.fd==listenfd)
            {
                connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen);

                if(connfd<0){
                    perror("connfd<0");

                    exit(1);
                }

                setnonblocking(connfd);

                const char *str = inet_ntoa(clientaddr.sin_addr);

                std::cout<<"connec_ from >> " << str << std::endl;

                //设置用于读操作的文件描述符
                ev.data.fd=connfd;

                //设置用于注测的读操作事件
                ev.events=EPOLLIN | EPOLLET;

                //注册ev
                epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);
            }
            else if(events[i].events & EPOLLIN)
            {
                printf("reading!/n");                

                if ( (sockfd = events[i].data.fd) < 0) continue;

                new_task = new task();

                new_task->fd =sockfd;

                new_task->next = NULL;

                //添加新的读任务
                pthread_mutex_lock(&mutex);

                if(readhead == NULL)
                {
                    readhead = new_task;

                    readtail = new_task;
                }   
                else
                {   
                    readtail->next = new_task;

                    readtail = new_task;
                }   

                //唤醒所有等待cond1条件的线程
                pthread_cond_broadcast(&cond1);

                pthread_mutex_unlock(&mutex);  
            }
            else if(events[i].events & EPOLLOUT)
            {   
                rdata=(struct user_data *)events[i].data.ptr;

                 sockfd = rdata->fd;

                 write(sockfd, rdata->line, rdata->n_size);

                 delete rdata;

                 //设置用于读操作的文件描述符
                 ev.data.fd=sockfd;

                 //设置用于注测的读操作事件
                ev.events=EPOLLIN | EPOLLET;

                 //修改sockfd上要处理的事件为EPOLIN
                epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev);
            }
              }
         }
    }

    void * readtask(void *args)
    {
        int fd=-1;

        unsigned int n;

        //用于把读出来的数据传递出去
        struct user_data *data = NULL;

        while(1){

            //互斥访问任务队列
            pthread_mutex_lock(&mutex);

            //等待到任务队列不为空
            while(readhead == NULL)
                 pthread_cond_wait(&cond1, &mutex); //线程阻塞,释放互斥锁,当等待的条件等到满足时,它会再次获得互斥锁

            fd = readhead->fd;

            //从任务队列取出一个读任务
            struct task *tmp = readhead;

            readhead = readhead->next;

            delete tmp;

            pthread_mutex_unlock(&mutex);

            data = new user_data();

            data->fd=fd;

            if ( (n = read(fd, data->line, MAXLINE)) < 0) {
                if (errno == ECONNRESET)
                    close(fd);
                else
                    std::cout<<"readline error"<< std::endl;

                if(data != NULL) delete data;
            } else if (n == 0) {
            //客户端关闭了,其对应的连接套接字可能也被标记为EPOLLIN,然后服务器去读这个套接字
            //结果发现读出来的内容为0,就知道客户端关闭了。
                close(fd);

               printf("Client close connect!/n");

               if(data != NULL) delete data;

            } else{

            std::cout << "read from client: " << data->line << std::endl;

            data->n_size = n;

            //设置需要传递出去的数据
            ev.data.ptr = data;

            //设置用于注测的写操作事件
            ev.events = EPOLLOUT | EPOLLET;

            //修改sockfd上要处理的事件为EPOLLOUT
            epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
               }
        }
    }

    =========================================================
    给出一个简单的客户端吧,从《Linux编程技术详解》书中拷贝而来。
    #include <stdio.h>
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <sys/un.h>
    #include <netdb.h>
    #include <unistd.h>

    int main(int argc,char *argv[])
    {
        int connect_fd;
        int ret;
        char snd_buf[1024];
        int i;
        int port;
        int len;

        static struct sockaddr_in srv_addr;

        if(argc!=3){
            printf("Usage: %s server_ip_address port/n",argv[0]);
            return 1;
        }   

        port=atoi(argv[2]);

        connect_fd=socket(PF_INET,SOCK_STREAM,0);
        if(connect_fd<0){
            perror("cannot create communication socket");
            return 1;
        }   

        memset(&srv_addr,0,sizeof(srv_addr));
        srv_addr.sin_family=AF_INET;
        srv_addr.sin_addr.s_addr=inet_addr(argv[1]);
        srv_addr.sin_port=htons(port);

        ret=connect(connect_fd,(struct sockaddr*)&srv_addr,sizeof(srv_addr));
        if(ret==-1){
            perror("cannot connect to the server");
            close(connect_fd);
            return 1;
        }
       
        memset(snd_buf,0,1024);

        while(1){
            write(STDOUT_FILENO,"input message:",14);

            bzero(snd_buf, 1024);
           
            len=read(STDIN_FILENO,snd_buf,1024);

            if(snd_buf[0]=='@')
                break;

            if(len>0)
                write(connect_fd,snd_buf,len);

            len=read(connect_fd,snd_buf,len);
            if(len>0)
                printf("Message from server: %s/n",snd_buf);
        }
       
        close(connect_fd);
        return 0;
    }

    ===========================================
    ecy@ecy-geek:~/C$ ./epoll_server
    connec_ from >> 127.0.0.1
    reading!
    read from client: ni hao ya ya ya ya ya

    reading!
    read from client: hello world

    reading!
    Client close connect!

    ecy@ecy-geek:~/C$ pstree | grep epoll
         |-gnome-terminal-+-bash---epoll_server---2*[{epoll_server}]

    ecy@ecy-geek:~/C$ ./p13.5 127.0.0.1 5555
    input message:ni hao ya ya ya ya ya
    Message from server: ni hao ya ya ya ya ya

    input message:hello world
    Message from server: hello world

    input message:@
    展开全文
  • Linux Epoll 编程实例

    2012-06-24 11:19:49
    文档是一个如何使用epoll在linux下编程的一个demo,没有任何bug,简单易懂。
  • Linux下多线程epoll编程实例

    千次阅读 2017-04-16 20:44:27
    ...Linux下多线程epoll编程,在高并发下测试通过,可以支持10000用户同时在线,测试服务器为Linode的vps服务器,操作系统为Centos64    // cs_network.cpp    // created by ccc   

    文章来源:http://blog.csdn.net/susubuhui/article/details/37906287

    1. Linux下多线程epoll编程,在高并发下测试通过,可以支持10000用户同时在线,测试服务器为Linode的vps服务器,操作系统为Centos64  
    2.   
    3. // cs_network.cpp  
    4.   
    5. // created by ccc  
    6.   
    7. #include "config.h"  
    8. #include "cs_network.h"  
    9. #include <iostream>  
    10. #include <sys/socket.h>  
    11.   
    12. #define VERSION_SOLARIS 0  
    13.   
    14. #if VERSION_SOLARIS  
    15.  #include <port.h>  
    16. #else  
    17.  #include <sys/epoll.h>  
    18. #endif  
    19.   
    20. #include <netinet/in.h>  
    21. #include <arpa/inet.h>  
    22. #include <fcntl.h>  
    23. #include <unistd.h>  
    24. #include <stdio.h>  
    25. #include <errno.h>  
    26. #include <stdlib.h>  
    27. #include <string.h>  
    28. #include <pthread.h>  
    29. #include "cs_data_inspect.h"  
    30. #include "cs_heart_thread.h"  
    31. #include "cs_gdata.h"  
    32. #include "cs_work_thread.h"  
    33.   
    34. #define SERV_PORT  5565 // 服务器端口  
    35. #define LISTENQ          128   // listen sock 参数  
    36. #define MAX_EPOLL_EVENT_COUNT      500   // 同时监听的 epoll 数  
    37. #define MAX_READ_BUF_SIZE    1024 * 8 // 最大读取数据 buf  
    38.   
    39. #define SOCKET_ERROR -1  
    40.   
    41. static int epfd;  
    42. static int listenfd;  
    43.   
    44. static pthread_t tids[NETWORK_READ_THREAD_COUNT];  
    45. static pthread_mutex_t mutex_thread_read;  
    46. static pthread_cond_t cond_thread_read;  
    47. static void *thread_read_tasks(void *args);  
    48.   
    49. static st_io_context_task *read_tasks_head = NULL;  
    50. static st_io_context_task *read_tasks_tail = NULL;  
    51.   
    52. //  
    53.   
    54.   
    55. static void append_read_task(st_context *context)  
    56. {  
    57.  st_io_context_task *new_task = NULL;  
    58.   
    59.  new_task = new st_io_context_task();  
    60.  new_task->context = context;  
    61.   
    62.  pthread_mutex_lock(&mutex_thread_read);  
    63.   
    64.  if(read_tasks_tail == NULL)  
    65.  {  
    66.   read_tasks_head = new_task;  
    67.   read_tasks_tail = new_task;  
    68.  }     
    69.  else  
    70.  {     
    71.   read_tasks_tail->next= new_task;  
    72.   read_tasks_tail = new_task;  
    73.  }    
    74.   
    75.  pthread_cond_broadcast(&cond_thread_read);  
    76.  pthread_mutex_unlock(&mutex_thread_read);   
    77. }  
    78.   
    79. void _setnonblocking(int sock)  
    80. {  
    81.  int opts;  
    82.  opts = fcntl(sock, F_GETFL);  
    83.  if(opts < 0){  
    84.   log("fcntl(sock,GETFL)");  
    85.   exit(1);  
    86.  }  
    87.   
    88.  opts = opts | O_NONBLOCK;  
    89.  if(fcntl(sock, F_SETFL, opts)<0){  
    90.   log("fcntl(sock,SETFL,opts)");  
    91.   exit(1);  
    92.  }  
    93. }  
    94.   
    95.   
    96. void* get_network_event(void *param)  
    97. {  
    98.  long network_event_id;  
    99.   
    100.  int i, sockfd;  
    101.   
    102.  network_event_id = (long) param;  
    103.   
    104.  log("begin thread get_network_event: %ld", network_event_id);  
    105.   
    106.  st_context *context = NULL;  
    107.   
    108. #if VERSION_SOLARIS  
    109.  uint_t nfds;  
    110.  port_event_t now_ev, ev, events[MAX_EPOLL_EVENT_COUNT];  
    111. #else  
    112.  unsigned nfds;  
    113.  struct epoll_event now_ev, ev, events[MAX_EPOLL_EVENT_COUNT];  
    114. #endif  
    115.   
    116.   
    117. #if VERSION_SOLARIS  
    118.  struct timespec timeout;  
    119.  timeout.tv_sec = 0;  
    120.  timeout.tv_nsec = 50000000;  
    121. #endif  
    122.   
    123.  while(1)   
    124.  {  
    125. #if VERSION_SOLARIS  
    126.   nfds = MAX_EPOLL_EVENT_COUNT;  
    127.   if (port_getn(epfd, events, MAX_EPOLL_EVENT_COUNT, &nfds, &timeout) != 0){  
    128.   
    129.    if (errno != ETIME){  
    130.     log("port_getn error");  
    131.     return false;  
    132.    }  
    133.   }  
    134.   
    135.   if (nfds == 0){  
    136.    continue;  
    137.   }  
    138.   else{  
    139.    // log("on port_getn: %d", nfds);  
    140.   }  
    141. #else  
    142.   //等待epoll事件的发生  
    143.   nfds = epoll_wait(epfd, events, MAX_EPOLL_EVENT_COUNT, 100000);  
    144. #endif  
    145.   
    146.   //处理所发生的所有事件  
    147.   for(i = 0; i < nfds; i++)  
    148.   {  
    149.    now_ev = events[i];  
    150.   
    151. #if VERSION_SOLARIS  
    152.    context = (st_context *)now_ev.portev_user;  
    153. #else  
    154.    context = (st_context *)now_ev.data.ptr;  
    155. #endif  
    156.   
    157. #if VERSION_SOLARIS  
    158.    if (now_ev.portev_source != PORT_SOURCE_FD){  
    159.     continue;  
    160.    }  
    161.   
    162.    if(now_ev.portev_object == listenfd)  
    163. #else  
    164.    if(context->fd == listenfd)  
    165. #endif  
    166.    {  
    167. #if VERSION_SOLARIS  
    168.     // 重新关联listen fd  
    169.     port_associate(epfd, PORT_SOURCE_FD, listenfd, POLLIN, context);  
    170. #endif  
    171.   
    172.     //append_read_task(NULL, true);  
    173.     int connfd;  
    174.     struct sockaddr_in clientaddr = {0};  
    175.     socklen_t clilen = sizeof(clientaddr);  
    176.   
    177.     connfd = accept(listenfd, (sockaddr *)&clientaddr, &clilen);  
    178.   
    179.     if(connfd == -1){  
    180.      log("connfd == -1 [%d]", errno);  
    181.      continue;  
    182.     }  
    183.   
    184.     _setnonblocking(connfd);  
    185.     int nRecvBuf=128*1024;//设置为32K  
    186.     setsockopt(connfd, SOL_SOCKET,SO_RCVBUF,(const char*)&nRecvBuf,sizeof(int));  
    187.     //发送缓冲区  
    188.     int nSendBuf=128*1024;//设置为32K  
    189.     setsockopt(connfd, SOL_SOCKET,SO_SNDBUF,(const char*)&nSendBuf,sizeof(int));  
    190.   
    191.     int nNetTimeout=1000;//1秒  
    192.     //发送时限  
    193.     setsockopt(connfd, SOL_SOCKET, SO_SNDTIMEO, (const char *)&nNetTimeout, sizeof(int));  
    194.   
    195.     //接收时限  
    196.     setsockopt(connfd, SOL_SOCKET, SO_RCVTIMEO, (const char *)&nNetTimeout, sizeof(int));  
    197.   
    198.     const char *remote_addr = inet_ntoa(clientaddr.sin_addr);  
    199.     int  remote_port = ntohs(clientaddr.sin_port);  
    200.   
    201.     context = query_context(connfd, remote_addr, remote_port);  
    202.   
    203.     mutex_lock(mutex_id_pool.mutex);  
    204.     context->id = add_client(context);  
    205.     mutex_unlock(mutex_id_pool.mutex);  
    206.   
    207.  //#if IS_DEBUG  
    208.  //   log("new obj fd: %d, id: %d context:%8X", connfd, context->id, context);  
    209.  //#endif  
    210.   
    211. #if VERSION_SOLARIS  
    212.     port_associate(epfd, PORT_SOURCE_FD, connfd, POLLIN, context);  
    213. #else  
    214.     struct epoll_event ev;  
    215.   
    216.     ev.events = EPOLLIN | EPOLLET;  
    217.     ev.data.ptr = context;  
    218.     epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);  
    219. #endif  
    220.    }  
    221.   
    222. #if VERSION_SOLARIS  
    223.    else if(now_ev.portev_events == POLLIN)  
    224.    {  
    225.     //log("on: now_ev.portev_events == POLLIN");  
    226.     append_read_task(context);  
    227.    }  
    228.    else{  
    229.     log("unknow portev_events: %d", now_ev.portev_events);  
    230.    }  
    231. #else  
    232.    else if(now_ev.events & EPOLLIN)  
    233.    {  
    234.     append_read_task(context);  
    235.    }  
    236.    else if(now_ev.events & EPOLLOUT)  
    237.    {   
    238.     sockfd = context->fd;  
    239.   
    240.     struct epoll_event ev;  
    241.   
    242.     ev.events = EPOLLIN | EPOLLET;  
    243.     ev.data.ptr = context;  
    244.     epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev);  
    245.    }  
    246.    else if(now_ev.events & EPOLLHUP)  
    247.    {  
    248.     log("else if(now_ev.events & EPOLLHUP)");  
    249.     del_from_network(context);  
    250.    }  
    251.    else  
    252.    {  
    253.     log("[warming]other epoll event: %d", now_ev.events);  
    254.    }  
    255. #endif  
    256.   }  
    257.  }  
    258.   
    259.  return NULL;  
    260. }  
    261.   
    262. bool start_network()  
    263. {  
    264.  int i, sockfd;  
    265.  int optionVal = 0;  
    266.   
    267.  st_context *context = NULL;  
    268.   
    269. #if VERSION_SOLARIS  
    270.  uint_t nfds;  
    271.  port_event_t ev;  
    272. #else  
    273.  unsigned nfds;  
    274.  struct epoll_event ev;  
    275. #endif  
    276.   
    277.  pthread_mutex_init(&mutex_thread_read, NULL);  
    278.   
    279.  pthread_cond_init(&cond_thread_read, NULL);  
    280.   
    281.  int ret;  
    282.  pthread_attr_t tattr;  
    283.   
    284.  /* Initialize with default */  
    285.  if(ret = pthread_attr_init(&tattr)){  
    286.   perror("Error initializing thread attribute [pthread_attr_init(3C)] ");  
    287.   return (-1);  
    288.  }  
    289.   
    290.  /* Make it a bound thread */  
    291.  if(ret = pthread_attr_setscope(&tattr, PTHREAD_SCOPE_SYSTEM)){  
    292.   perror("Error making bound thread [pthread_attr_setscope(3C)] ");  
    293.   return (-1);  
    294.  }  
    295.   
    296.  //初始化用于读线程池的线程,开启两个线程来完成任务,两个线程会互斥地访问任务链表  
    297.  for (i = 0; i < NETWORK_READ_THREAD_COUNT; i++){  
    298.   
    299.   pthread_create(&tids[i], &tattr, thread_read_tasks, NULL);  
    300.   //log("new read task thread %8X created.", tids[i]);  
    301.  }  
    302.   
    303. #if VERSION_SOLARIS  
    304.  epfd = port_create();  
    305. #else  
    306.  epfd = epoll_create(MAX_EPOLL_EVENT_COUNT);  
    307. #endif  
    308.   
    309.  struct sockaddr_in serveraddr;  
    310.   
    311.  if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1){  
    312.   
    313.   log("can't create socket.\n");  
    314.   
    315.   return false;  
    316.  }  
    317.   
    318.  //把socket设置为非阻塞方式  
    319.  _setnonblocking(listenfd);  
    320.   
    321.  optionVal = 0;  
    322.  setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR,  &optionVal, sizeof(optionVal));  
    323.   
    324.  {  
    325.   //设置与要处理的事件相关的文件描述符  
    326.   context = query_context(listenfd, "localhost", SERV_PORT);  
    327.   
    328. #if VERSION_SOLARIS  
    329.   //注册epoll事件  
    330.   port_associate(epfd, PORT_SOURCE_FD, listenfd, POLLIN, context);  
    331. #else  
    332.   ev.data.ptr = context;  
    333.   ev.events = EPOLLIN;  
    334.   epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);  
    335. #endif  
    336.  }  
    337.   
    338.  memset(&serveraddr, 0, sizeof(serveraddr));  
    339.  serveraddr.sin_family = AF_INET;  
    340.  serveraddr.sin_port = htons(SERV_PORT);  
    341.  serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);  
    342.   
    343.  if (bind(listenfd,(sockaddr *)&serveraddr, sizeof(serveraddr))  == -1){  
    344.   
    345.   log("bind error: %d\n", errno);  
    346.   
    347.   return false;  
    348.  }  
    349.   
    350.  //开始监听  
    351.  if (listen(listenfd, LISTENQ) == -1){  
    352.   
    353.   log("listen error: %d\n", errno);  
    354.   
    355.   return false;  
    356.  }  
    357.   
    358.  log("");  
    359.  log("**************************************");  
    360.  log("********   cserver start ok   ********");  
    361.  log("**************************************");  
    362.    
    363.  pthread_t tids_get_network_event[NETWORK_READ_THREAD_COUNT];  
    364.   
    365.  for (int i = 1; i <= 2; i++){  
    366.   pthread_create(&tids_get_network_event[i], &tattr, get_network_event, (void*)i);  
    367.  }  
    368.   
    369.  get_network_event(NULL);  
    370.   
    371.  return true;  
    372. }  
    373.   
    374. void shutdown_network()  
    375. {  
    376.  log("begin shutdown network ...");  
    377. }  
    378.   
    379. void *thread_read_tasks(void *args)  
    380. {  
    381.  bool is_listenfd;  
    382.  st_context *context;  
    383.   
    384.  while(1)  
    385.  {  
    386.   //互斥访问任务队列  
    387.   pthread_mutex_lock(&mutex_thread_read);  
    388.   
    389.   //等待到任务队列不为空  
    390.   while(read_tasks_head == NULL)  
    391.    pthread_cond_wait(&cond_thread_read, &mutex_thread_read); //线程阻塞,释放互斥锁,当等待的条件等到满足时,它会再次获得互斥锁  
    392.   
    393.   context     = read_tasks_head->context;  
    394.   
    395.   //从任务队列取出一个读任务  
    396.   st_io_context_task *tmp = read_tasks_head;  
    397.   read_tasks_head = read_tasks_head->next;  
    398.   delete tmp;  
    399.   
    400.   if (read_tasks_head == NULL){  
    401.    read_tasks_tail = NULL;  
    402.   }  
    403.   
    404.   pthread_mutex_unlock(&mutex_thread_read);  
    405.   
    406.   {  
    407.    char buf_read[MAX_READ_BUF_SIZE];  
    408.    int  read_count;  
    409.   
    410.    read_count = recv(context->fd, buf_read, sizeof(buf_read), 0);  
    411.   
    412.    //log("read id[%d]errno[%d]count[%d]", context->id, errno, read_count);  
    413.   
    414.    if (read_count < 0)  
    415.    {  
    416.     if (errno == EAGAIN){  
    417.      continue;  
    418.     }  
    419.   
    420.     log("1 recv < 0: errno: %d", errno);  
    421.     //if (errno == ECONNRESET){  
    422.     // log("client[%s:%d] disconnect ", context->remote_addr, context->remote_port);  
    423.     //}  
    424.   
    425.     del_from_network(context);  
    426.     continue;  
    427.    }  
    428.    else if (read_count == 0)  
    429.    {  
    430.     //客户端关闭了,其对应的连接套接字可能也被标记为EPOLLIN,然后服务器去读这个套接字  
    431.     //结果发现读出来的内容为0,就知道客户端关闭了。  
    432.     log("client close connect! errno[%d]", errno);  
    433.   
    434.     del_from_network(context);  
    435.     continue;  
    436.    }   
    437.    else  
    438.    {  
    439.     do   
    440.     {  
    441.      if (! on_recv_data(context, buf_read, read_count)){  
    442.       context->is_illegal = true;  
    443.   
    444.       log("当前客户数据存在异常");  
    445.       del_from_network(context);  
    446.       break;  
    447.      }  
    448.   
    449.      read_count = read(context->fd, buf_read, sizeof(buf_read));  
    450.   
    451.      if (read_count <= 0){  
    452.       if (errno == EINTR)  
    453.        continue;  
    454.       if (errno == EAGAIN){  
    455. #if VERSION_SOLARIS  
    456.        port_associate(epfd, PORT_SOURCE_FD, context->fd, POLLIN, context);  
    457. #endif  
    458.        break;  
    459.       }  
    460.   
    461.   
    462.       log("2 error read_count < 0, errno[%d]", errno);  
    463.   
    464.       del_from_network(context);  
    465.       break;  
    466.      }  
    467.     }  
    468.     while(1);  
    469.    }  
    470.   }  
    471.  }  
    472.   
    473.  log("thread_read_tasks end ....");  
    474. }  
    475.   
    476. void del_from_network(st_context *context, bool is_card_map_locked)  
    477. {  
    478.  gdata.lock();  
    479.  if (gdata.manager == context){  
    480.   gdata.manager = NULL;  
    481.  }  
    482.  gdata.unlock();  
    483.   
    484.  context->lock();  
    485.   
    486.  if (! context->valide){  
    487.   log("del_from_network is not valide");  
    488.   context->unlock();  
    489.   return;  
    490.  }  
    491.   
    492.  // 断开连接  
    493.  int fd = context->fd;  
    494.   
    495.  if (fd != -1){  
    496. #if VERSION_SOLARIS  
    497.   port_dissociate(epfd, PORT_SOURCE_FD, fd);  
    498. #else  
    499.   struct epoll_event ev;  
    500.   ev.data.fd = fd;  
    501.   epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &ev);  
    502. #endif  
    503.   close_socket(fd);  
    504.   context->fd = -1;  
    505.  }  
    506.   
    507.  context->valide = false;  
    508.  log("client[%d] is invalide", context->id);  
    509.  context->unlock();  
    510.   
    511.  mutex_lock(mutex_id_pool.mutex);  
    512.  del_client(context->id);  
    513.  mutex_unlock(mutex_id_pool.mutex);  
    514. }  
    515.   
    516. bool context_write(st_context *context, const char* buf, int len)  
    517. {  
    518.  if (len <= 0) return true;  
    519.   
    520.  int fd = context->fd;  
    521.   
    522.  if (fd == -1){  
    523.   return false;  
    524.  }  
    525.   
    526.  int nleft = len;  
    527.  int nsend;  
    528.   
    529.  bool result = true;  
    530.   
    531.     while(nleft > 0){  
    532.         nsend = write(fd, &buf[len - nleft], nleft);  
    533.   
    534.         if (nsend < 0) {  
    535.   
    536.             if(errno == EINTR) continue;  
    537.    if(errno == EAGAIN) {  
    538.     break;  
    539.    }  
    540.   
    541.             result = false;  
    542.    break;  
    543.         }  
    544.         else if (nsend == 0){  
    545.   
    546.             result = false;  
    547.    break;  
    548.         }  
    549.         nleft -= nsend;  
    550.     }  
    551.   
    552.  return result;  
    553. }  

    展开全文
  • Linux下基于epoll高并发服务器实现

    该编程实例是摘取了Linux C++网络编程的epoll实战部分完成了epoll类的封装和连接池部分,后面的文章会继续完善收发包、线程池、心跳以及收发包安全部分。

    net_socket.h

    #ifndef __NET_SOCKET_H__
    #define __NET_SOCKET_H__
    
    #include <vector>  
    #include <list>        
    #include <sys/epoll.h>
    #include <sys/socket.h>
    
    #define NGX_LISTEN_BACKLOG  511    
    #define NGX_MAX_EVENTS      512    
    
    typedef struct net_listening_s net_listening_t, *lpnet_listening_t;
    typedef struct net_connection_s  net_connection_t,*lpnet_connection_t;
    typedef class  CSocket           CSocket;
    
    typedef void (CSocket::*net_event_handler_pt)(lpnet_connection_t c); //定义成员函数指针
    
    struct net_listening_s
    {
        int                 port;
        int                 fd;
        lpnet_connection_t  connection;//连接池中的一个连接
    };
    
    struct net_connection_s
    {
    	
    	int                       fd;             
    	lpnet_listening_t         listening;      		
    
    	//------------------------------------	
    	unsigned                  instance:1;     
    	uint64_t                  iCurrsequence; 
    	struct sockaddr           s_sockaddr;   
    	//char                      addr_text[100]; 
    
    	//和读有关的标志-----------------------
    	//uint8_t                   r_ready;       
    	uint8_t                   w_ready;        
    
    	net_event_handler_pt      rhandler;       
    	net_event_handler_pt      whandler;       
    	
    	//和收包有关
    	unsigned char             curStat;                        
    	char                      dataHeadInfo[20];   			
    	char                      *precvbuf;                     
    	unsigned int              irecvlen;                       
    
    	bool                      ifnewrecvMem;                   
    	char                      *pnewMemPointer;               
    
    	//--------------------------------------------------
    	lpnet_connection_t        data; 
    };
    
    
    class CSocket
    {   
    
    public:
        CSocket();
        virtual ~CSocket();
        virtual bool Initialize();
    
    public:
        int net_epoll_init();
    
    	int net_epoll_add_event(int fd,int readevent,int writeevent,uint32_t otherflag,uint32_t eventtype,lpnet_connection_t c);     
    
    	int net_epoll_process_events(int timer);
    
    	//一些业务处理函数handler
    	void net_event_accept(lpnet_connection_t oldc);
    
    	void net_wait_request_handler(lpnet_connection_t c); 
    
    private:
        bool net_open_listening_sockets();
    	
        bool setnonblocking(int sockfd);  
    
    
    	ssize_t recvproc(lpnet_connection_t c,char *buff,ssize_t buflen);  //接收从客户端来的数据专用函数
    
    	lpnet_connection_t net_get_connection(int isock);   
    	void net_close_connection(lpnet_connection_t c);
    	void net_free_connection(lpnet_connection_t c);
    private:
        int                            m_worker_connections;
        int                            m_ListenPortCount;     //所监听的端口数量
    	int                            m_epollhandle;
    
    	lpnet_connection_t             m_pconnections;                     //注意这里可是个指针,其实这是个连接池的首地址
    	lpnet_connection_t             m_pfree_connections;
    	
    	int                            m_connection_n;                     //当前进程中所有连接对象的总数【连接池大小】
    	int                            m_free_connection_n;  
        
    	std::vector<lpnet_listening_t> m_ListenSocketList; 
    	struct epoll_event             m_events[NGX_MAX_EVENTS]; 
    };
    
    
    #endif
    

    net_socket.cpp

    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    
    #include <stdint.h>    
    #include <stdarg.h>
    #include <unistd.h>
    #include <errno.h>     //errno
    
    #include <sys/ioctl.h> 
    #include <arpa/inet.h>
    
    
    #include "net_socket.h"
    
    CSocket::CSocket()
    {
        m_worker_connections = 1024;
        m_ListenPortCount = 1;
        m_epollhandle = -1;
        m_pconnections = NULL;
        m_pfree_connections = NULL;
    
        return;
    }
    
    CSocket::~CSocket()
    {
        std::vector<lpnet_listening_t>::iterator pos;
        for(pos = m_ListenSocketList.begin(); pos != m_ListenSocketList.end(); ++pos)
        {
            delete (*pos);
        }
    
        m_ListenSocketList.clear();
    
        if(m_pconnections != NULL)
        {
            delete [] m_pconnections;
        }
    
        return;
    }
    
    bool CSocket::Initialize()
    {
        bool ret = net_open_listening_sockets();
    
        return ret;
    }
    
    bool CSocket::net_open_listening_sockets()
    {
        int                isock;
        struct sockaddr_in serv_addr;
        int                iport;
        char               strinfo[100];
    
        memset(&serv_addr, 0, sizeof(serv_addr)); 
        serv_addr.sin_family = AF_INET;
        serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);  
    
        for(int i=0; i < m_ListenPortCount; i++)
        {
            isock = socket(AF_INET, SOCK_STREAM, 0);
            if(isock == -1)
            {
                return false;
            }
    
            //解决TIME_WAIT这个状态导致bind()失败的问题
            int reuseaddr = 1;
            if(setsockopt(isock,SOL_SOCKET, SO_REUSEADDR,(const void *) &reuseaddr, sizeof(reuseaddr)) == -1)
            {
                close(isock);                                                
                return false;
            }
    
            //设置为非阻塞
            if(setnonblocking(isock) == false)
            {
                close(isock);                                                
                return false;
            }
    /*
            //设置本服务器要监听的地址和端口,这样客户端才能连接到该地址和端口并发送数据        
            strinfo[0] = 0;
            sprintf(strinfo,"ListenPort%d",i);
            iport = p_config->GetIntDefault(strinfo,10000);
            serv_addr.sin_port = htons((in_port_t)iport);   //in_port_t其实就是uint16_t
    */
    
            serv_addr.sin_port = htons(9000); 
            if(bind(isock, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == -1)
            {
                close(isock);                                                
                return false;            
            }        
    
            if(listen(isock, NGX_LISTEN_BACKLOG) == -1)
            {
                close(isock);                                                
                return false;              
            }
    
            lpnet_listening_t p_listensocketitem = new net_listening_t;
            memset(p_listensocketitem, 0, sizeof(net_listening_t));
            p_listensocketitem->port = iport;
            p_listensocketitem->fd = isock; 
            m_ListenSocketList.push_back(p_listensocketitem);                
        }
    
        if(m_ListenSocketList.size() <= 0)  //不可能一个端口都不监听吧
            return false;
    
        return true;
    }
    
    bool CSocket::setnonblocking(int sockfd)
    {
       int nb=1; //0:清除,1:设置  
        if(ioctl(sockfd, FIONBIO, &nb) == -1) //FIONBIO:设置/清除非阻塞I/O标记:0:清除,1:设置
        {
            return false;
        }
        return true;
    
        //如下也是一种写法,跟上边这种写法其实是一样的,但上边的写法更简单
        /* 
        //fcntl:file control【文件控制】相关函数,执行各种描述符控制操作
        //参数1:所要设置的描述符,这里是套接字【也是描述符的一种】
        int opts = fcntl(sockfd, F_GETFL);  //用F_GETFL先获取描述符的一些标志信息
        if(opts < 0) 
        {
            return false;
        }
        opts |= O_NONBLOCK; //把非阻塞标记加到原来的标记上,标记这是个非阻塞套接字【如何关闭非阻塞呢?opts &= ~O_NONBLOCK,然后再F_SETFL一下即可】
        if(fcntl(sockfd, F_SETFL, opts) < 0) 
        {
            return false;
        }
        return true;
        */    
    }
    
    int CSocket::net_epoll_init()
    {
        m_epollhandle = epoll_create(m_worker_connections);
        if(m_epollhandle == -1)
        {
            exit(2);
        }
    
        m_connection_n = m_worker_connections;
    
        m_pconnections = new net_connection_t[m_connection_n];
    
        int i = m_connection_n;
        lpnet_connection_t next = NULL;
        lpnet_connection_t c = m_pconnections;
    
        do
        {
            i--;
    
            c[i].data = next;
            c[i].fd = -1;
            c[i].instance = 1;
            c[i].iCurrsequence = 0;
    
            next = &c[i];
        }while(i);
    
        m_pfree_connections = next;
        m_free_connection_n = m_connection_n;
    
        std::vector<lpnet_listening_t>::iterator pos;
        for(pos = m_ListenSocketList.begin(); pos != m_ListenSocketList.end(); ++pos)
        {
            c = net_get_connection((*pos)->fd);
            if(c == NULL)
            {
                exit(2);
            }
    
            c->listening = (*pos);
            (*pos)->connection = c;
    
            c->rhandler = &CSocket::net_event_accept;
    
            if(net_epoll_add_event((*pos)->fd,
                                    1,0,
                                    0,
                                    EPOLL_CTL_ADD,
                                    c
                                    ) == -1)
            {
                exit(2);
            }
        }
    
        return 1;
    }
    
    lpnet_connection_t CSocket::net_get_connection(int isock)
    {
        lpnet_connection_t c = m_pfree_connections;
    
        if(c == NULL)
        {
            return NULL;
        }
    
        m_pfree_connections = c->data;
        m_free_connection_n--;
    
        uintptr_t instance = c->instance;
        uint64_t   iCurrsequence = c->iCurrsequence;    
    
        memset(c, 0, sizeof(net_connection_t));
        c->fd = isock;
        //c->curStat = _PKG_HD_INIT;   
        c->precvbuf = c->dataHeadInfo;
        //c->irecvlen = sizeof(COMM_PKG_HEADER);
        c->irecvlen = 1024;
        c->ifnewrecvMem = false;                             //标记我们并没有new内存,所以不用释放	 
        c->pnewMemPointer = NULL;
    
        c->instance = !instance; 
        c->iCurrsequence=iCurrsequence;++c->iCurrsequence;
       
        return c;
    }
    
    void CSocket::net_free_connection(lpnet_connection_t c)
    {
        if(c->ifnewrecvMem == true)
        {
    
        }
    
        c->data = m_pfree_connections;
    
        ++c->iCurrsequence;
    
        m_pfree_connections = c;
        ++m_free_connection_n;
        
        return;
    }
    
    void CSocket::net_close_connection(lpnet_connection_t c)
    {
        if(close(c->fd) == -1)
        {
    
        }
        c->fd = -1;
        net_free_connection(c);
        return;
    }
    
    int CSocket::net_epoll_add_event(int fd,
                                    int readevent,int writeevent,
                                    uint32_t otherflag, 
                                    uint32_t eventtype, 
                                    lpnet_connection_t c
                                    )
    {
        struct epoll_event ev;
        memset(&ev, 0, sizeof(ev));
    
        if(readevent == 1)
        {
            ev.events = EPOLLIN|EPOLLRDHUP;
        }
        else
        {
    
        }
    
        if(otherflag !=0)
        {
            ev.events |= otherflag;
        }
    
        ev.data.ptr = (void *)( (uintptr_t)c | c->instance);
    
        if(epoll_ctl(m_epollhandle, eventtype, fd, &ev) == -1)
        {
            return -1;
        }
        return 1;
    }
    
    void CSocket::net_event_accept(lpnet_connection_t oldc)
    {
        struct sockaddr     mysockaddr;
        socklen_t           socklen;
        int                 err;
        int                 level;
        int                 s;
        static int          use_accept4 = 1;
    
        lpnet_connection_t   newc;
    
        socklen = sizeof(mysockaddr);
        do
        {
            if(use_accept4)
            {
                s = accept4(oldc->fd, &mysockaddr, &socklen, SOCK_NONBLOCK);
            }
            else
            {
                s = accept(oldc->fd, &mysockaddr, &socklen);
            }
    
            if(s == -1)
            {
                err = errno;
    
                if(err == EAGAIN)
                {
                    return;
                }
    
                if(use_accept4 && err == ENOSYS)
                {
                    use_accept4 = 0;
                    continue;
                }
                return;
            }
    
            newc = net_get_connection(s);
            if(newc == NULL)
            {
                if(close(s) == -1)
                {
    
                }
                return;
            }
    
            memcpy(&newc->s_sockaddr, &mysockaddr, socklen);
    
            if(!use_accept4)
            {
                if(setnonblocking(s))
                {
                    net_close_connection(newc);
                    return;
                }
            }
    
            newc->listening = oldc->listening;
            newc->w_ready = 1;
    
            newc->rhandler = &CSocket::net_wait_request_handler;
            if(net_epoll_add_event(s,
                                    1,0,
                                    0,
                                    EPOLL_CTL_ADD,
                                    newc
                                    ) == -1)
            {
                net_close_connection(newc);
                return;
            }
    
            break;
        }while(1);
    
        return;
    }
    
    int CSocket::net_epoll_process_events(int timer)
    {
        int events = epoll_wait(m_epollhandle, m_events, NGX_MAX_EVENTS, timer);
    
        if(events == -1)
        {
            if(errno == EINTR)
            {
                return 1;
            }
            else
            {
                return 0;
            }
        }
    
        if(events == 0)
        {
            if(timer != -1)
            {
                return 1;
            }
            return 0;
        }
    
        lpnet_connection_t  c;
        uintptr_t           instance;
        uint32_t            revents;
    
        for(int i=0; i < events; ++i)
        {
            c = (lpnet_connection_t)(m_events[i].data.ptr);
            instance = (uintptr_t) c & 1;
    
            c = (lpnet_connection_t) ((uintptr_t)c & (uintptr_t)~1);
    
            if(c->fd == -1)
            {
                continue;
            }
    
            if(c->instance != instance)
            {
                continue;
            }
    
            revents = m_events[i].events;
    
            if(revents & (EPOLLERR|EPOLLHUP))
            {
                revents |= EPOLLIN|EPOLLOUT;
            }
    
            if(revents & EPOLLIN)
            {
                (this->* (c->rhandler))(c);//执行CSocekt::ngx_event_accept(c)
            }
    
            if(revents & EPOLLOUT)
            {
    
            }
        }
    
        return 1;
    }
    
    void CSocket::net_wait_request_handler(lpnet_connection_t c)
    {
        ssize_t reco = recvproc(c, c->precvbuf, c->irecvlen);
    
        if(reco <= 0)
        {
            return;
        }    
    
        //这里收到数据
        printf("recev data:%s", c->precvbuf);
    }
    
    ssize_t CSocket::recvproc(lpnet_connection_t c,char *buff,ssize_t buflen)
    {
        ssize_t n;
    
        n = recv(c->fd, buff, buflen, 0);
    
        if(n == 0)
        {
            net_close_connection(c);
            return -1;
        }
    
        if(n < 0)
        {
            if(errno == EAGAIN || errno == EWOULDBLOCK)
            {
                return -1;
            }
    
            if(errno == EINTR)
            {
                return -1;
            }
    
            net_close_connection(c);
        }
    
        return n;
    }
    

    net.cpp

    #include <stdio.h>
    #include <stdlib.h>
    
    #include "net_socket.h"
    
    CSocket g_socket;
    
    int main(int argc, char *const *argv)
    {
    
        if(g_socket.Initialize() == false)//初始化socket
        {
            exit(2);
        }    
    
        g_socket.net_epoll_init();
    
        for(;;)
        {
            g_socket.net_epoll_process_events(-1);
        }
    
        return 0;
    }
    
    展开全文
  • epoll网络编程实例

    2016-04-14 20:42:44
    在前面已经经过了PPC、TPC、select之类( TPC就是使用进程处理data,TPC就是使用线程处理 ),前面两个的缺点大家应该都是知道的是吧,对于select( 其实poll和他差不多 ),缺点是能同时连接的fd... 对于改进poll的epoll
  • socket epoll网络编程实例

    千次阅读 2019-04-14 20:52:27
    总结了各个博主的经验,写出了简单的demo,实例为ET模式,转载请写明出处,如有宝贵意见请留言。 第一版: 服务端: #include <stdio.h> #include <sys/epoll.h> #include <stdlib.h> #...
  • 在前面已经经过了PPC、TPC、select之类( TPC就是使用进程处理data,TPC就是使用线程处理 ),前面两个的缺点大家应该都是知道的是吧,对于select( 其实poll和他差不多 ),缺点是能同时连接的fd... 对于改进poll的epoll
  • epoll编程

    2018-10-17 00:01:52
    实例编程后续完善各种问题处理,以及定时处理等等: myepoll.c #include &lt;stdio.h&gt; #include &lt;stdlib.h&gt; #include &lt;unistd.h&gt; #include &lt;sys/epoll.h&gt; ...
  • linux epoll多线程编程实例,在Linux下编译通过
  • epoll编程接口(上)

    2019-08-07 17:11:11
    epollAPI的核心数据结构称作epoll实例,它和一个打开的文件描述符相关。这个文件描述符不是用来做IO操作的,相反,它是内核数据结构的句柄,这些内核数据结构实现的目的有两个: 记录了在进程中声明过的感兴趣的...
  • epoll概念及基本函数介绍: 对epoll的详细介绍说明可查看此链接: http://blog.csdn.net/h514434485/article/details/78151090epoll服务端代码实例:#include #include #include #include #include <sys
  • c++ 封装socket和epoll编程

    千次阅读 2015-11-24 16:27:08
    编程实例: CSocket.h文件请参考前一篇文章 CSocketEvent.h #include #include #include #include "CSocket.h" const int MaxEvents = 256; const int EpollSize = 500; class CSocketEvent { public: ...
  • EPOLL简介: Epoll是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序存在并发中只有少量活跃的情况下的系统利用率,因为它会服用文件描述符集合来传递结果而不用迫使开发者每次等待事件之前都必须...
  • 0x00前言 文章中的文字可能存在语法错误以及标点错误,请谅解; 如果在文章中发现代码错误或其它问题请告知,感谢! 0x01 epoll简介 ...epoll在被内核初始化时,会开辟一个高速缓存区,并利用epoll_ create...
  • epoll的使用实例

    2017-12-29 19:50:00
    在网络编程中通常需要处理很多个连接,可以用select和poll来处理多个连接。但是select都受进程能打开的最大文件描述符个数的限制。并且select和poll效率会随着监听fd的数目增多而下降。 解决方法就是用epoll 1....
  • Linux_Epoll介绍和程序实例,帮你了解EPOLL编程
  • Linux epoll和应用实例

    2019-07-14 11:11:10
    1.高性能网路服务器编程用什么技术? Unix/Linux平台:epoll Windows平台:IOCP 2.Linux的epoll函数 epoll用到三个函数: int epoll_create(int size); int epoll_ctl(int epfd, int op, int fd, struct epoll_...

空空如也

空空如也

1 2 3 4 5 ... 8
收藏数 141
精华内容 56
关键字:

epoll编程实例