epoll reactor
2017-06-01 02:06:00 weixin_30627341 阅读数 2


a, Handle表示句柄,文件描述符、socket等;
b, EventDemultiplexer表示多路分发机制,调用系统提供的多IO路复用,比如select,epoll。
程序先将关注的句柄注册到EventDemultiplexer,当有相关事件到来触发EventDemultiplexer通知程序。
c, EventHandler定义事件处理方法,
d, Reactor是事件管理的接口,注册和销毁事件,并运行事件循环,当EventDemultiplexer返回Handle有事件"就绪",将其分发给EventHandler上对应的方法。
e, ConcreteEventhandler实现每个事件的处理逻辑。




epoll原理:
struct eventpoll{
/*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/
struct rb_root rbr;
/*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/
struct list_head rdlist;
...
};
每一个epoll对象都有一个独立的eventpoll结构体,当我们执行epoll_ctl时,除了把socket放到epoll文件系统里file对象对应的红黑树上之外,还会给内核中断处理程序注册一个回调函数,告诉内核,如果这个句柄的中断到了,就把它放到准备就绪list链表里。

对于每一个事件,都会建立一个epitem结构体
struct epitem{
struct rb_node rbn;//红黑树节点
struct list_head rdllink;//双向链表节点
struct epoll_filefd ffd; //事件句柄信息
struct eventpoll *ep; //指向其所属的eventpoll对象
struct epoll_event event; //期待发生的事件类型
}


epoll_wait在for循环中检查epitem中有没有已经完成的事件,有的话就把结果返回。没有的话调用schedule_timeout()进入休眠,直到进程被再度唤醒或者超时。


LT模式下,只要一个句柄上的事件一次没有处理完,会在以后调用epoll_wait时次次返回这个句柄,而ET模式仅在第一次返回。
当一个socket句柄上有事件时,内核会把该句柄插入上面所说的准备就绪list链表,这时我们调用epoll_wait,会把准备就绪的socket拷贝到用户态内存,然后清空准备就绪list链表,最后,epoll_wait干了件事,就是检查这些socket,如果不是ET模式(就是LT模式的句柄了),并且这些socket上确实有未处理的事件时,又把该句柄放回到刚刚清空的准备就绪链表了。


struct epoll_event ev,events[20]; //ev用于注册事件,数组用于回传要处理的事件
ev.data.fd=listenfd; //设置与要处理的事件相关的文件描述符
ev.events=EPOLLIN|EPOLLET; //设置要处理的事件类型

epfd=epoll_create(256);
epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);



for( ; ; ) {
nfds = epoll_wait(epfd,events,20,500);
for(i=0;i<nfds;++i)
{
if(events[i].data.fd==listenfd) //有新的连接
{
connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen); //accept这个连接
ev.data.fd=connfd;
ev.events=EPOLLIN|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev); //将新的fd添加到epoll的监听队列中
}
else if( events[i].events&EPOLLIN ) //接收到数据,读socket
{
sockfd=events[i].data.fd;
n = read(sockfd, line, MAXLINE)) < 0 //读
ev.data.ptr = md; //md为自定义类型,添加数据
ev.events=EPOLLOUT|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);//修改标识符,等待下一个循环时发送数据,异步处理的精髓
}
else if(events[i].events&EPOLLOUT) //有数据待发送,写socket
{
struct myepoll_data* md = (myepoll_data*)events[i].data.ptr; //取数据
sockfd = md->fd;
send( sockfd, md->ptr, strlen((char*)md->ptr), 0 ); //发送数据
ev.data.fd=sockfd;
ev.events=EPOLLIN|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); //修改标识符,等待下一个循环时接收数据
}
else
{
//其他的处理
}
}
}



转载于:https://www.cnblogs.com/liuhan333/p/6926930.html

epoll reactor 相关内容

2018-08-12 18:44:00 weixin_33724659 阅读数 37

1. 背景

最近在看redis源码,主体流程看完了。
在网上看到了reactor模式,看了一下,其实我们经常使用这种模式。

2. 什么是reactor模式

反应器设计模式(Reactor pattern)是一种为处理并发服务请求,并将请求提交到一个或者多个服务处理程序的事件设计模式。
当客户端请求抵达后,服务处理程序使用多路分配策略,由一个非阻塞的线程来接收所有的请求,然后派发这些请求至相关的工作线程进行处理。
简单说,就是如何处理多个客户端的并发请求的解决模式。

3. 一般实现方法

3.1 epoll管理大量客户端(可读、可写)

处理大量客户端请求,不能去挨个轮训,这个要使用epoll。
epoll由于select, poll。这个网上有很多资料。

3.2 请求的单线程处理(redis中的特殊处理)

到达服务端的请求,redis代码中,使用单线程处理。
redis使用就是单线程既要处理连接、也要处理请求,因为redis全内存,避免了线程间加锁、切换等。

3.3 请求的多线程处理(更一般的架构处理)

通常架构中都是采用多线程来处理到达服务器的请求。
将请求放到无锁队列中,处理线程循环来请求任务即可。
因为服务器代码中,可能会访问第三方存储等长耗时处理。用多线程加速。

466768-20180812184033921-924194026.png

4. 参考

epoll reactor 相关内容

2017-01-19 11:43:26 weixin_34258782 阅读数 8

Reactor模式

  • Reactor中文通常被译作“反应堆”,从字面上便透露出了凌厉的霸气,但是不可否认,这个中文的译名除了让人觉得Reactor模式很厉害之外,并没有透露出更多的信息,让人对其理解仍是云里雾里,或许将Reactor看成是'Notifier'+'Dispatcher'的结合体,会更能直接的表达Reactor的工作模式。
  • Wikipedia的定义

    • 对于什么是Reactor模式,不同的机构给出了许多不同的定义,个人最喜欢Wikipedia上的描述,原文如下:
The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers

整段描述强调几个关键信息:

  1. Reactor模式是一种事件驱动模式
  2. 一个或多个输入是同时交付的
  3. 服务控制器会分离到达的多个请求并同步的分发给相关的处理器进行处理
  • Reactor模式简图
    reactor模型.png

Reactor模式的组件

  • Reactor模式中包含了Reactor、Handle、EventHandler、Synchronous Event Demutiplexer等必不可少的组成部份,组件间的关系如下所示:
    reactor模式.png

    • Synchronous Event Demutiplexer是同步事件分离器,是IO多路服用技术实现的关键,主要任务是监听系统的Handlers中事件的发生,监听的过程是阻塞等待的
  • Handle是句柄,即操作系统管理的资源

    • Dispatcher是分发器,负责根据Event的类型来调用EventHandler
    • EventHandler是事件处理器,每个事件处理器会关联一个Handle

Reactor模式工作的时序图如下:

reactor时序图.png

  • 流程如下所述:

    • 初始化Dispatcher
    • 注册EventHandler到Dispatcher中,每个EventHandler包含对相应Handle的引用,从而建立Handle到EventHandler的映射
    • 启动Event Loop。在Event Loop中,调用select()方法,Synchronous Event Demultiplexer阻塞等待Event发生
    • 当某个或某些Handle的Event发生后,select()方法返回,Dispatcher根据返回的Event找到注册的EventHandler,并回调该EventHandler的handle_event()方法
    • 在EventHandler的handle_event()方法中还可以向Dispatcher中注册新的EventHandler,用来处理下一个Event

I/O多路复用

  • 指多个描述符(fd)的I/O操作能在一个线程内并发交替地顺序完成,这就叫I/O多路复用,这里的“复用”指的是复用同一个线程。I/O多路复用是reactor模式的核心,I/O多路复用功能由Synchronous Event Demutiplexer提供,而Synchronous Event Demutiplexer是由操作系统实现的。

NIO

  • NIO是Java SDK提供的基于Reactor模式的非阻塞IO工作模式的实现
  • NIO与IO的主要区别包括:

| NIO | IO |
| ------ | ------ |
| 面向缓冲 | 面向流 |
| 非阻塞IO | 阻塞 IO |

面向缓冲与面向流

  • 面向缓冲是NIO与传统IO最大的区别,传统的IO是基于字节的,所有的IO都被看作是单个子节的移动,而NIO是基于块的,一个块则由多个字节组成,从简单的原理上看,NIO的性能提升主要来源于每一次IO操作都能尽可能的读写更多的字节,而更直接的提升是得益于NIO使用的IO读写结构Channel和Buffer非常贴近操作系统执行IO的方式:通道和缓冲器。简单的理解就是越接近操作系统底层,越快速。
  • NIO对Reactor模型组件的实现

    1. Selector : Synchronous Event Demultiplexer
    2. SelectKey : Event
    3. SocketChannel : Handle
    4. (Handlers write by yourself) : EventHandler

#### NIO server demo

package nio.service;
import nio.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class NIOService {
   private static final Logger logger = LoggerFactory.getLogger(NIOService.class);
   private static final int TIMEOUT = 30000;
   private static final int PORT = 8084;
   private static final int BLOCK = 4096;
   private static ByteBuffer receiveBuffer = ByteBuffer.allocate(BLOCK);
   private static ByteBuffer sendBuffer = ByteBuffer.allocate(BLOCK);
   public static void main(String[] args) {

       SocketChannel readChannel = null;
       ServerSocketChannel listenChannel = null;

       try {
           //创建一个选择器,用于监听管理事件
           Selector selector = Selector.open();
           //创建服务器socket管道,用来接收和分发socket连接
           listenChannel = ServerSocketChannel.open();
           //设置管道为非阻塞形式
           listenChannel.configureBlocking(false);
           //绑定socket端口
           listenChannel.socket().bind(new InetSocketAddress(PORT));
           //将管道注册到选择器中,注册的事件类型为OP_ACCEPT,现在selector只监听指定端口的OP_ACCEPT事件
           listenChannel.register(selector, SelectionKey.OP_ACCEPT);

           System.out.println("running.......");


           while (true) {
               //监听事件,设置每一次监听的超时数
               if (selector.select(TIMEOUT) == 0) {
                   continue;
               }
               //事件来源列表
               Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
               while (iter.hasNext()) {
                   //获取一个事件
                   SelectionKey key = iter.next();

                   //删除当前事件
                   iter.remove();

                   //检查此键的通道是否已准备好接受新的套接字连接
                   if (key.isAcceptable()) {
                       System.out.println("in acceptable key readyOps : " + key.readyOps());
                       //返回此键的通道
                       ServerSocketChannel server = (ServerSocketChannel) key.channel();
                       //接收套接字连接,返回套接字通道
                       readChannel = server.accept();
                       //配置为非阻塞
                       readChannel.configureBlocking(false);
                       //注册到同一个Selector中
                       readChannel.register(selector, SelectionKey.OP_READ);
                   }
                   if (key.isReadable()) {
                       //返回为之创建的socket通道
                       SocketChannel channel = (SocketChannel) key.channel();
                       //清空buffer
                       receiveBuffer.clear();
                       int count = channel.read(receiveBuffer);
                       if (count > 0) {
                           String receiveText = new String(receiveBuffer.array(), 0, count);
                           //在发送的buffer中存入收到的内容
                           sendBuffer.put(receiveBuffer.array());
                           //设置监听的消息包括写消息
                           key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);

                           System.out.println("in readable key readyOps : " + key.readyOps());
                           System.out.println("receive message : " + receiveText);

                           //在网络不阻塞的情况下,socket都是可写的
                           //保证缓存的可读性
                           sendBuffer.clear();
                           sendBuffer.put(CharsetUtil.encode("server get the input: " + receiveText).array());
                           //保证buffer的可读性
                           sendBuffer.flip();
                           SocketChannel socketChannel = (SocketChannel) key.channel();
                           if (socketChannel.isConnected()) {
                               //do something
                               while (sendBuffer.hasRemaining()) {
                                   if (socketChannel.isConnected()) {
                                       socketChannel.write(sendBuffer);
                                   }
                               }
                           }
                           sendBuffer.clear();
                       } else if (count < 0) {//socket已经断开,count == -1
                           //do nothing
                       }
                   }

               }
           }
       } catch (IOException e) {
           logger.error("nio selector open failure.", e);
       } finally {
           if (readChannel != null) {
               try {
                   readChannel.close();
               } catch (IOException e) {
                   e.printStackTrace();
               }
           }

           if (listenChannel != null) {
               try {
                   listenChannel.close();
               } catch (Exception e) {
                   e.printStackTrace();
               }
           }
       }
   }
}

epoll

epoll是除了NIO之外另一个对Reactor模式的经典实现,那么什么是epoll?根据man手册的描述:epoll是为了处理大批量句柄(handle)而做了改进的poll。linux 2.5.44以上版本支持,是一个性能极好的I/O多路复用的实现。

epoll对select/poll的改进

  • select/poll 的工作方式是阻塞监听描述符(fd)就绪状态,直到有描述符就绪(有数据可读、可写、或者有except、timeout),则返回fdset,然后通过遍历fdset来找到就绪的描述符。这两种模式之间并没有本质上的区别,poll与select最大的区别在于 poll没有最大文件描述符数量的限制,而select有最大文件描述符数量上的限制。
  • select/poll的缺点在于:当大量文件描述符的数组被复制于用户态和内核的地址空间之间,不论这些文件描述符是否就绪,轮询的开销会随着文件描述符数量的增加而线性增加。
  • epoll 对与select/poll的改进在于,epoll在阻塞监听描述符就绪状态时,仅会返回已经就绪的文件描述符集合,而无需再遍历集合。epoll事先通过epoll_ctl()来注册一 个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait() 时便得到通知,这样IO的效率不会随着监视fd的数量的增长而下降。
  • epoll提供了三个可以操作的接口
  1. epoll_create(int size);
  2. epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
  3. epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

    
    + int epoll_create(int size)
     + 创建一个epoll的句柄,size是最大的描述符监听数。size并不会真正限制epolll所能监听的描述符最大个数,只是对内核初始分配内部数据结构的一个建议值。
    + int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
      + 对指定描述符的监听事件执行指定的Op操作,epfd:是epoll_create()的返回值,fd:是需要监听的文件描述符,epoll_event:是告诉内核需要监听什么事,op:表示操作,有三个宏定义,分别是添加EPOLL_CTL_ADD,删除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD。
      +  int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout) : 等待epfd上的io事件,最多返回maxevents个事件, 该方法与Java NIO的select()方法类似,maxevents的值不能超过epoll_create中的参数size的大小,也就是说,size的大小也间接限制了epoll的线程一次会批量处理几个IO事件。
    + epoll使用的代码demo:
 while(1)
   {
       nfds = epoll_wait(epfd,events,30,600);
       for(i=0;i<nfds;++i)
       {
           if(events[i].data.fd==listenfd) //出现新的连接
           {
               connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen); //接受这个连接
               ev.data.fd=connfd;
               ev.events=EPOLLIN|EPOLLET;
               epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev); //将新的描述符添加到epoll的监听队列中
           }

           else if( events[i].events&EPOLLIN ) //接收到数据
           {
               n = read(sockfd, line, MAXLINE)) < 0   
               ev.data.ptr = md;     //添加数据,md为自定义类型my_data的指针
               ev.events=EPOLLOUT|EPOLLET;
               epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);//修改标识符,等待下一个循环时发送数据
           }
           else if(events[i].events&EPOLLOUT) //有数据待发送,写socket
           {
               struct my_data* md = (my_data*)events[i].data.ptr;    //读取数据
               sockfd = md->fd;
               send( sockfd, md->ptr, strlen((char*)md->ptr), 0 );     //发送数据
               ev.data.fd=sockfd;
               ev.events=EPOLLIN|EPOLLET;
               epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); //修改标识符,等待下一个循环时接收数据
           }
           else
           {
               //其他的处理
           }
       }
   }

epoll reactor 相关内容

2017-04-09 22:41:56 rankun1 阅读数 3564

转自:http://blog.csdn.net/analogous_love/article/details/53319815

最近一直在看游双的《高性能Linux服务器编程》一书,下载链接: http://download.csdn.net/detail/analogous_love/9673008

书上是这么介绍Reactor模式的:




按照这个思路,我写个简单的练习:

[cpp] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. /**  
  2.  *@desc:   用reactor模式练习服务器程序,main.cpp 
  3.  *@author: zhangyl 
  4.  *@date:   2016.11.23 
  5.  */  
  6.   
  7. #include <iostream>  
  8. #include <string.h>  
  9. #include <sys/types.h>  
  10. #include <sys/socket.h>  
  11. #include <netinet/in.h>  
  12. #include <arpa/inet.h>  //for htonl() and htons()  
  13. #include <unistd.h>  
  14. #include <fcntl.h>  
  15. #include <sys/epoll.h>  
  16. #include <signal.h>     //for signal()  
  17. #include <pthread.h>  
  18. #include <semaphore.h>  
  19. #include <list>  
  20. #include <errno.h>  
  21. #include <time.h>  
  22. #include <sstream>  
  23. #include <iomanip> //for std::setw()/setfill()  
  24. #include <stdlib.h>  
  25.   
  26.   
  27. #define WORKER_THREAD_NUM   5  
  28.   
  29. #define min(a, b) ((a <= b) ? (a) : (b))   
  30.   
  31. int g_epollfd = 0;  
  32. bool g_bStop = false;  
  33. int g_listenfd = 0;  
  34. pthread_t g_acceptthreadid = 0;  
  35. pthread_t g_threadid[WORKER_THREAD_NUM] = { 0 };  
  36. pthread_cond_t g_acceptcond;  
  37. pthread_mutex_t g_acceptmutex;  
  38.   
  39. pthread_cond_t g_cond /*= PTHREAD_COND_INITIALIZER*/;  
  40. pthread_mutex_t g_mutex /*= PTHREAD_MUTEX_INITIALIZER*/;  
  41.   
  42. pthread_mutex_t g_clientmutex;  
  43.   
  44. std::list<int> g_listClients;  
  45.   
  46. void prog_exit(int signo)  
  47. {  
  48.     ::signal(SIGINT, SIG_IGN);  
  49.     ::signal(SIGKILL, SIG_IGN);  
  50.     ::signal(SIGTERM, SIG_IGN);  
  51.   
  52.     std::cout << "program recv signal " << signo << " to exit." << std::endl;  
  53.   
  54.     g_bStop = true;  
  55.   
  56.     ::epoll_ctl(g_epollfd, EPOLL_CTL_DEL, g_listenfd, NULL);  
  57.   
  58.     //TODO: 是否需要先调用shutdown()一下?  
  59.     ::shutdown(g_listenfd, SHUT_RDWR);  
  60.     ::close(g_listenfd);  
  61.     ::close(g_epollfd);  
  62.   
  63.     ::pthread_cond_destroy(&g_acceptcond);  
  64.     ::pthread_mutex_destroy(&g_acceptmutex);  
  65.       
  66.     ::pthread_cond_destroy(&g_cond);  
  67.     ::pthread_mutex_destroy(&g_mutex);  
  68.   
  69.     ::pthread_mutex_destroy(&g_clientmutex);  
  70. }  
  71.   
  72. bool create_server_listener(const char* ip, short port)  
  73. {  
  74.     g_listenfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);  
  75.     if (g_listenfd == -1)  
  76.         return false;  
  77.   
  78.     int on = 1;  
  79.     ::setsockopt(g_listenfd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on));  
  80.     ::setsockopt(g_listenfd, SOL_SOCKET, SO_REUSEPORT, (char *)&on, sizeof(on));  
  81.   
  82.     struct sockaddr_in servaddr;  
  83.     memset(&servaddr, 0, sizeof(servaddr));   
  84.     servaddr.sin_family = AF_INET;  
  85.     servaddr.sin_addr.s_addr = inet_addr(ip);  
  86.     servaddr.sin_port = htons(port);  
  87.     if (::bind(g_listenfd, (sockaddr *)&servaddr, sizeof(servaddr)) == -1)  
  88.         return false;  
  89.   
  90.     if (::listen(g_listenfd, 50) == -1)  
  91.         return false;  
  92.   
  93.     g_epollfd = ::epoll_create(1);  
  94.     if (g_epollfd == -1)  
  95.         return false;  
  96.   
  97.     struct epoll_event e;  
  98.     memset(&e, 0, sizeof(e));  
  99.     e.events = EPOLLIN | EPOLLRDHUP;  
  100.     e.data.fd = g_listenfd;  
  101.     if (::epoll_ctl(g_epollfd, EPOLL_CTL_ADD, g_listenfd, &e) == -1)  
  102.         return false;  
  103.   
  104.     return true;  
  105. }  
  106.   
  107. void release_client(int clientfd)  
  108. {  
  109.     if (::epoll_ctl(g_epollfd, EPOLL_CTL_DEL, clientfd, NULL) == -1)  
  110.         std::cout << "release client socket failed as call epoll_ctl failed" << std::endl;  
  111.   
  112.     ::close(clientfd);  
  113. }  
  114.   
  115. void* accept_thread_func(void* arg)  
  116. {     
  117.     while (!g_bStop)  
  118.     {  
  119.         ::pthread_mutex_lock(&g_acceptmutex);  
  120.         ::pthread_cond_wait(&g_acceptcond, &g_acceptmutex);  
  121.         //::pthread_mutex_lock(&g_acceptmutex);  
  122.   
  123.         //std::cout << "run loop in accept_thread_func" << std::endl;  
  124.   
  125.         struct sockaddr_in clientaddr;  
  126.         socklen_t addrlen;  
  127.         int newfd = ::accept(g_listenfd, (struct sockaddr *)&clientaddr, &addrlen);  
  128.         ::pthread_mutex_unlock(&g_acceptmutex);  
  129.         if (newfd == -1)  
  130.             continue;  
  131.   
  132.         std::cout << "new client connected: " << ::inet_ntoa(clientaddr.sin_addr) << ":" << ::ntohs(clientaddr.sin_port) << std::endl;  
  133.   
  134.         //将新socket设置为non-blocking  
  135.         int oldflag = ::fcntl(newfd, F_GETFL, 0);  
  136.         int newflag = oldflag | O_NONBLOCK;  
  137.         if (::fcntl(newfd, F_SETFL, newflag) == -1)  
  138.         {  
  139.             std::cout << "fcntl error, oldflag =" << oldflag << ", newflag = " << newflag << std::endl;  
  140.             continue;  
  141.         }  
  142.   
  143.         struct epoll_event e;  
  144.         memset(&e, 0, sizeof(e));  
  145.         e.events = EPOLLIN | EPOLLRDHUP | EPOLLET;  
  146.         e.data.fd = newfd;  
  147.         if (::epoll_ctl(g_epollfd, EPOLL_CTL_ADD, newfd, &e) == -1)  
  148.         {  
  149.             std::cout << "epoll_ctl error, fd =" << newfd << std::endl;  
  150.         }  
  151.     }  
  152.   
  153.     return NULL;  
  154. }  
  155.   
  156.   
  157. void* worker_thread_func(void* arg)  
  158. {     
  159.     while (!g_bStop)  
  160.     {  
  161.         int clientfd;  
  162.         ::pthread_mutex_lock(&g_clientmutex);  
  163.         while (g_listClients.empty())  
  164.             ::pthread_cond_wait(&g_cond, &g_clientmutex);  
  165.         clientfd = g_listClients.front();  
  166.         g_listClients.pop_front();    
  167.         pthread_mutex_unlock(&g_clientmutex);  
  168.   
  169.         //gdb调试时不能实时刷新标准输出,用这个函数刷新标准输出,使信息在屏幕上实时显示出来  
  170.         std::cout << std::endl;  
  171.   
  172.         std::string strclientmsg;  
  173.         char buff[256];  
  174.         bool bError = false;  
  175.         while (true)  
  176.         {  
  177.             memset(buff, 0, sizeof(buff));  
  178.             int nRecv = ::recv(clientfd, buff, 256, 0);  
  179.             if (nRecv == -1)  
  180.             {  
  181.                 if (errno == EWOULDBLOCK)  
  182.                     break;  
  183.                 else  
  184.                 {  
  185.                     std::cout << "recv error, client disconnected, fd = " << clientfd << std::endl;  
  186.                     release_client(clientfd);  
  187.                     bError = true;  
  188.                     break;  
  189.                 }  
  190.                       
  191.             }  
  192.             //对端关闭了socket,这端也关闭。  
  193.             else if (nRecv == 0)  
  194.             {  
  195.                 std::cout << "peer closed, client disconnected, fd = " << clientfd << std::endl;  
  196.                 release_client(clientfd);  
  197.                 bError = true;  
  198.                 break;  
  199.             }  
  200.   
  201.             strclientmsg += buff;  
  202.         }  
  203.   
  204.         //出错了,就不要再继续往下执行了  
  205.         if (bError)  
  206.             continue;  
  207.           
  208.         std::cout << "client msg: " << strclientmsg;  
  209.   
  210.         //将消息加上时间标签后发回  
  211.         time_t now = time(NULL);  
  212.         struct tm* nowstr = localtime(&now);  
  213.         std::ostringstream ostimestr;  
  214.         ostimestr << "[" << nowstr->tm_year + 1900 << "-"   
  215.                   << std::setw(2) << std::setfill('0') << nowstr->tm_mon + 1 << "-"   
  216.                   << std::setw(2) << std::setfill('0') << nowstr->tm_mday << " "  
  217.                   << std::setw(2) << std::setfill('0') << nowstr->tm_hour << ":"   
  218.                   << std::setw(2) << std::setfill('0') << nowstr->tm_min << ":"   
  219.                   << std::setw(2) << std::setfill('0') << nowstr->tm_sec << "]server reply: ";  
  220.   
  221.         strclientmsg.insert(0, ostimestr.str());  
  222.           
  223.         while (true)  
  224.         {  
  225.             int nSent = ::send(clientfd, strclientmsg.c_str(), strclientmsg.length(), 0);  
  226.             if (nSent == -1)  
  227.             {  
  228.                 if (errno == EWOULDBLOCK)  
  229.                 {  
  230.                     ::sleep(10);  
  231.                     continue;  
  232.                 }  
  233.                 else  
  234.                 {  
  235.                     std::cout << "send error, fd = " << clientfd << std::endl;  
  236.                     release_client(clientfd);  
  237.                     break;  
  238.                 }  
  239.                      
  240.             }            
  241.   
  242.             std::cout << "send: " << strclientmsg;  
  243.             strclientmsg.erase(0, nSent);  
  244.   
  245.             if (strclientmsg.empty())  
  246.                 break;  
  247.         }  
  248.     }  
  249.   
  250.     return NULL;  
  251. }  
  252.   
  253. void daemon_run()  
  254. {  
  255.     int pid;  
  256.     signal(SIGCHLD, SIG_IGN);  
  257.     //1)在父进程中,fork返回新创建子进程的进程ID;  
  258.     //2)在子进程中,fork返回0;  
  259.     //3)如果出现错误,fork返回一个负值;  
  260.     pid = fork();  
  261.     if (pid < 0)  
  262.     {  
  263.         std:: cout << "fork error" << std::endl;  
  264.         exit(-1);  
  265.     }  
  266.     //父进程退出,子进程独立运行  
  267.     else if (pid > 0) {  
  268.         exit(0);  
  269.     }  
  270.     //之前parent和child运行在同一个session里,parent是会话(session)的领头进程,  
  271.     //parent进程作为会话的领头进程,如果exit结束执行的话,那么子进程会成为孤儿进程,并被init收养。  
  272.     //执行setsid()之后,child将重新获得一个新的会话(session)id。  
  273.     //这时parent退出之后,将不会影响到child了。  
  274.     setsid();  
  275.     int fd;  
  276.     fd = open("/dev/null", O_RDWR, 0);  
  277.     if (fd != -1)  
  278.     {  
  279.         dup2(fd, STDIN_FILENO);  
  280.         dup2(fd, STDOUT_FILENO);  
  281.         dup2(fd, STDERR_FILENO);  
  282.     }  
  283.     if (fd > 2)  
  284.         close(fd);  
  285.    
  286. }  
  287.   
  288.   
  289. int main(int argc, char* argv[])  
  290. {    
  291.     short port = 0;  
  292.     int ch;  
  293.     bool bdaemon = false;  
  294.     while ((ch = getopt(argc, argv, "p:d")) != -1)  
  295.     {  
  296.         switch (ch)  
  297.         {  
  298.         case 'd':  
  299.             bdaemon = true;  
  300.             break;  
  301.         case 'p':  
  302.             port = atol(optarg);  
  303.             break;  
  304.         }  
  305.     }  
  306.   
  307.     if (bdaemon)  
  308.         daemon_run();  
  309.   
  310.   
  311.     if (port == 0)  
  312.         port = 12345;  
  313.        
  314.     if (!create_server_listener("0.0.0.0", port))  
  315.     {  
  316.         std::cout << "Unable to create listen server: ip=0.0.0.0, port=" << port << "." << std::endl;  
  317.         return -1;  
  318.     }  
  319.   
  320.       
  321.     //设置信号处理  
  322.     signal(SIGCHLD, SIG_DFL);  
  323.     signal(SIGPIPE, SIG_IGN);  
  324.     signal(SIGINT, prog_exit);  
  325.     signal(SIGKILL, prog_exit);  
  326.     signal(SIGTERM, prog_exit);  
  327.   
  328.     ::pthread_cond_init(&g_acceptcond, NULL);  
  329.     ::pthread_mutex_init(&g_acceptmutex, NULL);  
  330.   
  331.     ::pthread_cond_init(&g_cond, NULL);  
  332.     ::pthread_mutex_init(&g_mutex, NULL);  
  333.   
  334.     ::pthread_mutex_init(&g_clientmutex, NULL);  
  335.        
  336.     ::pthread_create(&g_acceptthreadid, NULL, accept_thread_func, NULL);  
  337.     //启动工作线程  
  338.     for (int i = 0; i < WORKER_THREAD_NUM; ++i)  
  339.     {  
  340.         ::pthread_create(&g_threadid[i], NULL, worker_thread_func, NULL);  
  341.     }  
  342.   
  343.     while (!g_bStop)  
  344.     {         
  345.         struct epoll_event ev[1024];  
  346.         int n = ::epoll_wait(g_epollfd, ev, 1024, 10);  
  347.         if (n == 0)  
  348.             continue;  
  349.         else if (n < 0)  
  350.         {  
  351.             std::cout << "epoll_wait error" << std::endl;  
  352.             continue;  
  353.         }  
  354.   
  355.         int m = min(n, 1024);  
  356.         for (int i = 0; i < m; ++i)  
  357.         {  
  358.             //通知接收连接线程接收新连接  
  359.             if (ev[i].data.fd == g_listenfd)  
  360.                 pthread_cond_signal(&g_acceptcond);  
  361.             //通知普通工作线程接收数据  
  362.             else  
  363.             {                 
  364.                 pthread_mutex_lock(&g_clientmutex);                
  365.                 g_listClients.push_back(ev[i].data.fd);  
  366.                 pthread_mutex_unlock(&g_clientmutex);  
  367.                 pthread_cond_signal(&g_cond);  
  368.                 //std::cout << "signal" << std::endl;  
  369.             }  
  370.                   
  371.         }  
  372.   
  373.     }  
  374.       
  375.     return 0;  
  376. }  


程序的功能一个简单的echo服务:客户端连接上服务器之后,给服务器发送信息,服务器加上时间戳等信息后返回给客户端。使用到的知识点有:

1. 条件变量

2.epoll的边缘触发模式


程序的大致框架是:

1. 主线程只负责监听侦听socket上是否有新连接,如果有新连接到来,交给一个叫accept的工作线程去接收新连接,并将新连接socket绑定到主线程使用epollfd上去。

2. 主线程如果侦听到客户端的socket上有可读事件,则通知另外五个工作线程去接收处理客户端发来的数据,并将数据加上时间戳后发回给客户端。

3. 可以通过传递-p port来设置程序的监听端口号;可以通过传递-d来使程序以daemon模式运行在后台。这也是标准linux daemon模式的书写方法。


程序难点和需要注意的地方是:

1. 条件变量为了防止虚假唤醒,一定要在一个循环里面调用pthread_cond_wait()函数,我在worker_thread_func()中使用了:

[cpp] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. while (g_listClients.empty())  
  2.             ::pthread_cond_wait(&g_cond, &g_clientmutex);  


在accept_thread_func()函数里面我没有使用循环,这样会有问题吗?

2. 使用条件变量pthread_cond_wait()函数的时候一定要先获得与该条件变量相关的mutex,即像下面这样的结构:

[cpp] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. mutex_lock(...);  
  2.   
  3. while (condition is true)  
  4.     ::pthread_cond_wait(...);  
  5.   
  6. //这里可以有其他代码...  
  7. mutex_unlock(...);  
  8.   
  9. //这里可以有其他代码...  

因为pthread_cond_wait()如果阻塞的话,它解锁相关mutex和阻塞当前线程这两个动作加在一起是原子的。


3. 作为服务器端程序最好对侦听socket调用setsocketopt()设置SO_REUSEADDR和SO_REUSEPORT两个标志,因为服务程序有时候会需要重启(比如调试的时候就会不断重启),如果不设置这两个标志的话,绑定端口时就会调用失败。因为一个端口使用后,即使不再使用,因为四次挥手该端口处于TIME_WAIT状态,有大约2min的MSL(Maximum Segment Lifetime,最大存活期)。这2min内,该端口是不能被重复使用的。你的服务器程序上次使用了这个端口号,接着重启,因为这个缘故,你再次绑定这个端口就会失败(bind函数调用失败)。要不你就每次重启时需要等待2min后再试(这在频繁重启程序调试是难以接收的),或者设置这种SO_REUSEADDR和SO_REUSEPORT立即回收端口使用。

其实,SO_REUSEADDR在windows上和Unix平台上还有些细微的区别,我在libevent源码中看到这样的描述:

[cpp] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. int evutil_make_listen_socket_reuseable(evutil_socket_t sock)  
  2. {  
  3. #ifndef WIN32  
  4.     int one = 1;  
  5.     /* REUSEADDR on Unix means, "don't hang on to this address after the 
  6.      * listener is closed."  On Windows, though, it means "don't keep other 
  7.      * processes from binding to this address while we're using it. */  
  8.     return setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void*) &one,  
  9.         (ev_socklen_t)sizeof(one));  
  10. #else  
  11.     return 0;  
  12. #endif  
  13. }  
注意注释部分,在Unix平台上设置这个选项意味着,任意进程可以复用该地址;而在windows,不要阻止其他进程复用该地址。也就是在在Unix平台上,如果不设置这个选项,任意进程在一定时间内,不能bind该地址;在windows平台上,在一定时间内,其他进程不能bind该地址,而本进程却可以再次bind该地址。


4. epoll_wait对新连接socket使用的是边缘触发模式EPOLLET(edge trigger),而不是默认的水平触发模式(level trigger)。因为如果采取水平触发模式的话,主线程检测到某个客户端socket数据可读时,通知工作线程去收取该socket上的数据,这个时候主线程继续循环,只要在工作线程没有将该socket上数据全部收完,或者在工作线程收取数据的过程中,客户端有新数据到来,主线程会继续发通知(通过pthread_cond_signal())函数,再次通知工作线程收取数据。这样会可能导致多个工作线程同时调用recv函数收取该客户端socket上的数据,这样产生的结果将会导致数据错乱。

      相反,采取边缘触发模式,只有等某个工作线程将那个客户端socket上数据全部收取完毕,主线程的epoll_wait才可能会再次触发来通知工作线程继续收取那个客户端socket新来的数据。


5. 代码中有这样一行:

[cpp] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. //gdb调试时不能实时刷新标准输出,用这个函数刷新标准输出,使信息在屏幕上实时显示出来  
  2.         std::cout << std::endl;  

如果不加上这一行,正常运行服务器程序,程序中要打印到控制台的信息都会打印出来,但是如果用gdb调试状态下,程序的所有输出就不显示了。我不知道这是不是gdb的一个bug,所以这里加上std::endl来输出一个换行符并flush标准输出,让输出显示出来。(std::endl不仅是输出一个换行符而且是同时刷新输出,相当于fflush()函数)。


程序我部署起来了,你可以使用linux的nc命令或自己写程序连接服务器来查看程序效果,当然也可以使用telnet命令,方法:

linux:

nc 120.55.94.78 12345

telnet 120.55.94.78 12345

然后就可以给服务器自由发送数据了,服务器会给你发送的信息加上时间戳返回给你。效果如图:



另外我将这个代码改写了成纯C++11版本,使用CMake编译,为了支持编译必须加上这-std=c++11:

CMakeLists.txt代码如下:

[cpp] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. cmake_minimum_required(VERSION 2.8)  
  2.   
  3. PROJECT(myreactorserver)  
  4.   
  5. AUX_SOURCE_DIRECTORY(./ SRC_LIST)  
  6. SET(EXECUTABLE_OUTPUT_PATH ./)  
  7.   
  8. ADD_DEFINITIONS(-g -W -Wall -Wno-deprecated -DLINUX -D_REENTRANT -D_FILE_OFFSET_BITS=64 -DAC_HAS_INFO -DAC_HAS_WARNING -DAC_HAS_ERROR -DAC_HAS_CRITICAL -DTIXML_USE_STL -DHAVE_CXX_STDHEADERS ${CMAKE_CXX_FLAGS} -std=c++11)  
  9.   
  10. INCLUDE_DIRECTORIES(  
  11. ./  
  12. )  
  13. LINK_DIRECTORIES(  
  14. ./  
  15. )  
  16.   
  17. set(  
  18. main.cpp  
  19. myreator.cpp  
  20. )  
  21.   
  22. ADD_EXECUTABLE(myreactorserver ${SRC_LIST})  
  23.   
  24. TARGET_LINK_LIBRARIES(myreactorserver pthread)  

myreactor.h文件内容:

[cpp] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. /** 
  2. *@desc: myreactor头文件, myreactor.h 
  3. *@author: zhangyl 
  4. *@date: 2016.12.03 
  5. */  
  6. #ifndef __MYREACTOR_H__  
  7. #define __MYREACTOR_H__  
  8.   
  9. #include <list>  
  10. #include <memory>  
  11. #include <thread>  
  12. #include <mutex>  
  13. #include <condition_variable>  
  14.   
  15. #define WORKER_THREAD_NUM   5  
  16.   
  17. class CMyReactor  
  18. {  
  19. public:  
  20.     CMyReactor();  
  21.     ~CMyReactor();  
  22.   
  23.     bool init(const char* ip, short nport);  
  24.     bool uninit();  
  25.   
  26.     bool close_client(int clientfd);  
  27.   
  28.     static void* main_loop(void* p);  
  29.   
  30. private:  
  31.     //no copyable  
  32.     CMyReactor(const CMyReactor& rhs);  
  33.     CMyReactor& operator = (const CMyReactor& rhs);  
  34.   
  35.     bool create_server_listener(const char* ip, short port);  
  36.       
  37.     static void accept_thread_proc(CMyReactor* pReatcor);  
  38.     static void worker_thread_proc(CMyReactor* pReatcor);  
  39.   
  40. private:  
  41.     //C11语法可以在这里初始化  
  42.     int                          m_listenfd = 0;  
  43.     int                          m_epollfd  = 0;  
  44.     bool                         m_bStop    = false;  
  45.       
  46.     std::shared_ptr<std::thread> m_acceptthread;  
  47.     std::shared_ptr<std::thread> m_workerthreads[WORKER_THREAD_NUM];  
  48.       
  49.     std::condition_variable      m_acceptcond;  
  50.     std::mutex                   m_acceptmutex;  
  51.   
  52.     std::condition_variable      m_workercond ;  
  53.     std::mutex                   m_workermutex;  
  54.   
  55.     std::list<int>                 m_listClients;  
  56. };  
  57.   
  58. #endif //!__MYREACTOR_H__  

myreactor.cpp文件内容:

[cpp] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. /**  
  2.  *@desc: myreactor实现文件, myreactor.cpp 
  3.  *@author: zhangyl 
  4.  *@date: 2016.12.03 
  5.  */  
  6. #include "myreactor.h"  
  7. #include <iostream>  
  8. #include <string.h>  
  9. #include <sys/types.h>  
  10. #include <sys/socket.h>  
  11. #include <netinet/in.h>  
  12. #include <arpa/inet.h>  //for htonl() and htons()  
  13. #include <fcntl.h>  
  14. #include <sys/epoll.h>  
  15. #include <list>  
  16. #include <errno.h>  
  17. #include <time.h>  
  18. #include <sstream>  
  19. #include <iomanip> //for std::setw()/setfill()  
  20. #include <unistd.h>  
  21.   
  22. #define min(a, b) ((a <= b) ? (a) : (b))  
  23.   
  24. CMyReactor::CMyReactor()  
  25. {  
  26.     //m_listenfd = 0;  
  27.     //m_epollfd = 0;  
  28.     //m_bStop = false;  
  29. }  
  30.   
  31. CMyReactor::~CMyReactor()  
  32. {  
  33.   
  34. }  
  35.   
  36. bool CMyReactor::init(const char* ip, short nport)  
  37. {  
  38.     if (!create_server_listener(ip, nport))  
  39.     {  
  40.         std::cout << "Unable to bind: " << ip << ":" << nport << "." << std::endl;  
  41.         return false;  
  42.     }  
  43.   
  44.   
  45.     std::cout << "main thread id = " << std::this_thread::get_id() << std::endl;  
  46.   
  47.     //启动接收新连接的线程  
  48.     m_acceptthread.reset(new std::thread(CMyReactor::accept_thread_proc, this));  
  49.       
  50.     //启动工作线程  
  51.     for (auto& t : m_workerthreads)  
  52.     {  
  53.         t.reset(new std::thread(CMyReactor::worker_thread_proc, this));  
  54.     }  
  55.   
  56.   
  57.     return true;  
  58. }  
  59.   
  60. bool CMyReactor::uninit()  
  61. {  
  62.     m_bStop = true;  
  63.     m_acceptcond.notify_one();  
  64.     m_workercond.notify_all();  
  65.   
  66.     m_acceptthread->join();  
  67.     for (auto& t : m_workerthreads)  
  68.     {  
  69.         t->join();  
  70.     }  
  71.   
  72.     ::epoll_ctl(m_epollfd, EPOLL_CTL_DEL, m_listenfd, NULL);  
  73.   
  74.     //TODO: 是否需要先调用shutdown()一下?  
  75.     ::shutdown(m_listenfd, SHUT_RDWR);  
  76.     ::close(m_listenfd);  
  77.     ::close(m_epollfd);  
  78.   
  79.     return true;  
  80. }  
  81.   
  82. bool CMyReactor::close_client(int clientfd)  
  83. {  
  84.     if (::epoll_ctl(m_epollfd, EPOLL_CTL_DEL, clientfd, NULL) == -1)  
  85.     {  
  86.         std::cout << "close client socket failed as call epoll_ctl failed" << std::endl;  
  87.         //return false;  
  88.     }  
  89.           
  90.   
  91.     ::close(clientfd);  
  92.   
  93.     return true;  
  94. }  
  95.   
  96.   
  97. void* CMyReactor::main_loop(void* p)  
  98. {  
  99.     std::cout << "main thread id = " << std::this_thread::get_id() << std::endl;  
  100.       
  101.     CMyReactor* pReatcor = static_cast<CMyReactor*>(p);  
  102.       
  103.     while (!pReatcor->m_bStop)  
  104.     {  
  105.         struct epoll_event ev[1024];  
  106.         int n = ::epoll_wait(pReatcor->m_epollfd, ev, 1024, 10);  
  107.         if (n == 0)  
  108.             continue;  
  109.         else if (n < 0)  
  110.         {  
  111.             std::cout << "epoll_wait error" << std::endl;  
  112.             continue;  
  113.         }  
  114.   
  115.         int m = min(n, 1024);  
  116.         for (int i = 0; i < m; ++i)  
  117.         {  
  118.             //通知接收连接线程接收新连接  
  119.             if (ev[i].data.fd == pReatcor->m_listenfd)  
  120.                 pReatcor->m_acceptcond.notify_one();  
  121.             //通知普通工作线程接收数据  
  122.             else  
  123.             {  
  124.                 {  
  125.                     std::unique_lock<std::mutex> guard(pReatcor->m_workermutex);  
  126.                     pReatcor->m_listClients.push_back(ev[i].data.fd);  
  127.                 }  
  128.                                   
  129.                 pReatcor->m_workercond.notify_one();  
  130.                 //std::cout << "signal" << std::endl;  
  131.             }// end if  
  132.   
  133.         }// end for-loop  
  134.     }// end while  
  135.   
  136.     std::cout << "main loop exit ..." << std::endl;  
  137.   
  138.     return NULL;  
  139. }  
  140.   
  141. void CMyReactor::accept_thread_proc(CMyReactor* pReatcor)  
  142. {  
  143.     std::cout << "accept thread, thread id = " << std::this_thread::get_id() << std::endl;  
  144.   
  145.     while (true)  
  146.     {  
  147.         int newfd;  
  148.         struct sockaddr_in clientaddr;  
  149.         socklen_t addrlen;  
  150.         {  
  151.             std::unique_lock<std::mutex> guard(pReatcor->m_acceptmutex);  
  152.             pReatcor->m_acceptcond.wait(guard);  
  153.             if (pReatcor->m_bStop)  
  154.                 break;  
  155.   
  156.             //std::cout << "run loop in accept_thread_proc" << std::endl;  
  157.               
  158.             newfd = ::accept(pReatcor->m_listenfd, (struct sockaddr *)&clientaddr, &addrlen);  
  159.         }  
  160.         if (newfd == -1)  
  161.             continue;  
  162.   
  163.         std::cout << "new client connected: " << ::inet_ntoa(clientaddr.sin_addr) << ":" << ::ntohs(clientaddr.sin_port) << std::endl;  
  164.   
  165.         //将新socket设置为non-blocking  
  166.         int oldflag = ::fcntl(newfd, F_GETFL, 0);  
  167.         int newflag = oldflag | O_NONBLOCK;  
  168.         if (::fcntl(newfd, F_SETFL, newflag) == -1)  
  169.         {  
  170.             std::cout << "fcntl error, oldflag =" << oldflag << ", newflag = " << newflag << std::endl;  
  171.             continue;  
  172.         }  
  173.   
  174.         struct epoll_event e;  
  175.         memset(&e, 0, sizeof(e));  
  176.         e.events = EPOLLIN | EPOLLRDHUP | EPOLLET;  
  177.         e.data.fd = newfd;  
  178.         if (::epoll_ctl(pReatcor->m_epollfd, EPOLL_CTL_ADD, newfd, &e) == -1)  
  179.         {  
  180.             std::cout << "epoll_ctl error, fd =" << newfd << std::endl;  
  181.         }  
  182.     }  
  183.   
  184.     std::cout << "accept thread exit ..." << std::endl;  
  185. }  
  186.   
  187. void CMyReactor::worker_thread_proc(CMyReactor* pReatcor)  
  188. {  
  189.     std::cout << "new worker thread, thread id = " << std::this_thread::get_id() << std::endl;  
  190.   
  191.     while (true)  
  192.     {  
  193.         int clientfd;  
  194.         {  
  195.             std::unique_lock<std::mutex> guard(pReatcor->m_workermutex);  
  196.             while (pReatcor->m_listClients.empty())  
  197.             {  
  198.                 if (pReatcor->m_bStop)  
  199.                 {  
  200.                     std::cout << "worker thread exit ..." << std::endl;  
  201.                     return;  
  202.                 }  
  203.                       
  204.                 pReatcor->m_workercond.wait(guard);  
  205.             }  
  206.                   
  207.             clientfd = pReatcor->m_listClients.front();  
  208.             pReatcor->m_listClients.pop_front();  
  209.         }  
  210.   
  211.         //gdb调试时不能实时刷新标准输出,用这个函数刷新标准输出,使信息在屏幕上实时显示出来  
  212.         std::cout << std::endl;  
  213.   
  214.         std::string strclientmsg;  
  215.         char buff[256];  
  216.         bool bError = false;  
  217.         while (true)  
  218.         {  
  219.             memset(buff, 0, sizeof(buff));  
  220.             int nRecv = ::recv(clientfd, buff, 256, 0);  
  221.             if (nRecv == -1)  
  222.             {  
  223.                 if (errno == EWOULDBLOCK)  
  224.                     break;  
  225.                 else  
  226.                 {  
  227.                     std::cout << "recv error, client disconnected, fd = " << clientfd << std::endl;  
  228.                     pReatcor->close_client(clientfd);  
  229.                     bError = true;  
  230.                     break;  
  231.                 }  
  232.   
  233.             }  
  234.             //对端关闭了socket,这端也关闭。  
  235.             else if (nRecv == 0)  
  236.             {  
  237.                 std::cout << "peer closed, client disconnected, fd = " << clientfd << std::endl;  
  238.                 pReatcor->close_client(clientfd);  
  239.                 bError = true;  
  240.                 break;  
  241.             }  
  242.   
  243.             strclientmsg += buff;  
  244.         }  
  245.   
  246.         //出错了,就不要再继续往下执行了  
  247.         if (bError)  
  248.             continue;  
  249.   
  250.         std::cout << "client msg: " << strclientmsg;  
  251.   
  252.         //将消息加上时间标签后发回  
  253.         time_t now = time(NULL);  
  254.         struct tm* nowstr = localtime(&now);  
  255.         std::ostringstream ostimestr;  
  256.         ostimestr << "[" << nowstr->tm_year + 1900 << "-"  
  257.             << std::setw(2) << std::setfill('0') << nowstr->tm_mon + 1 << "-"  
  258.             << std::setw(2) << std::setfill('0') << nowstr->tm_mday << " "  
  259.             << std::setw(2) << std::setfill('0') << nowstr->tm_hour << ":"  
  260.             << std::setw(2) << std::setfill('0') << nowstr->tm_min << ":"  
  261.             << std::setw(2) << std::setfill('0') << nowstr->tm_sec << "]server reply: ";  
  262.   
  263.         strclientmsg.insert(0, ostimestr.str());  
  264.   
  265.         while (true)  
  266.         {  
  267.             int nSent = ::send(clientfd, strclientmsg.c_str(), strclientmsg.length(), 0);  
  268.             if (nSent == -1)  
  269.             {  
  270.                 if (errno == EWOULDBLOCK)  
  271.                 {  
  272.                     std::this_thread::sleep_for(std::chrono::milliseconds(10));  
  273.                     continue;  
  274.                 }  
  275.                 else  
  276.                 {  
  277.                     std::cout << "send error, fd = " << clientfd << std::endl;  
  278.                     pReatcor->close_client(clientfd);  
  279.                     break;  
  280.                 }  
  281.   
  282.             }  
  283.   
  284.             std::cout << "send: " << strclientmsg;  
  285.             strclientmsg.erase(0, nSent);  
  286.   
  287.             if (strclientmsg.empty())  
  288.                 break;  
  289.         }  
  290.     }  
  291. }  
  292.   
  293. bool CMyReactor::create_server_listener(const char* ip, short port)  
  294. {  
  295.     m_listenfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);  
  296.     if (m_listenfd == -1)  
  297.         return false;  
  298.   
  299.     int on = 1;  
  300.     ::setsockopt(m_listenfd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on));  
  301.     ::setsockopt(m_listenfd, SOL_SOCKET, SO_REUSEPORT, (char *)&on, sizeof(on));  
  302.   
  303.     struct sockaddr_in servaddr;  
  304.     memset(&servaddr, 0, sizeof(servaddr));  
  305.     servaddr.sin_family = AF_INET;  
  306.     servaddr.sin_addr.s_addr = inet_addr(ip);  
  307.     servaddr.sin_port = htons(port);  
  308.     if (::bind(m_listenfd, (sockaddr *)&servaddr, sizeof(servaddr)) == -1)  
  309.         return false;  
  310.   
  311.     if (::listen(m_listenfd, 50) == -1)  
  312.         return false;  
  313.   
  314.     m_epollfd = ::epoll_create(1);  
  315.     if (m_epollfd == -1)  
  316.         return false;  
  317.   
  318.     struct epoll_event e;  
  319.     memset(&e, 0, sizeof(e));  
  320.     e.events = EPOLLIN | EPOLLRDHUP;  
  321.     e.data.fd = m_listenfd;  
  322.     if (::epoll_ctl(m_epollfd, EPOLL_CTL_ADD, m_listenfd, &e) == -1)  
  323.         return false;  
  324.   
  325.     return true;  
  326. }  

main.cpp文件内容:

[cpp] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. /**  
  2.  *@desc:   用reactor模式练习服务器程序 
  3.  *@author: zhangyl 
  4.  *@date:   2016.12.03 
  5.  */  
  6.   
  7. #include <iostream>  
  8. #include <signal.h>     //for signal()  
  9. #include<unistd.h>  
  10. #include <stdlib.h>       //for exit()  
  11. #include <sys/types.h>  
  12. #include <sys/stat.h>  
  13. #include <fcntl.h>  
  14. #include "myreactor.h"  
  15.   
  16. CMyReactor g_reator;  
  17.   
  18. void prog_exit(int signo)  
  19. {  
  20.     std::cout << "program recv signal " << signo << " to exit." << std::endl;   
  21.   
  22.     g_reator.uninit();  
  23. }  
  24.   
  25. void daemon_run()  
  26. {  
  27.     int pid;  
  28.     signal(SIGCHLD, SIG_IGN);  
  29.     //1)在父进程中,fork返回新创建子进程的进程ID;  
  30.     //2)在子进程中,fork返回0;  
  31.     //3)如果出现错误,fork返回一个负值;  
  32.     pid = fork();  
  33.     if (pid < 0)  
  34.     {  
  35.         std:: cout << "fork error" << std::endl;  
  36.         exit(-1);  
  37.     }  
  38.     //父进程退出,子进程独立运行  
  39.     else if (pid > 0) {  
  40.         exit(0);  
  41.     }  
  42.     //之前parent和child运行在同一个session里,parent是会话(session)的领头进程,  
  43.     //parent进程作为会话的领头进程,如果exit结束执行的话,那么子进程会成为孤儿进程,并被init收养。  
  44.     //执行setsid()之后,child将重新获得一个新的会话(session)id。  
  45.     //这时parent退出之后,将不会影响到child了。  
  46.     setsid();  
  47.     int fd;  
  48.     fd = open("/dev/null", O_RDWR, 0);  
  49.     if (fd != -1)  
  50.     {  
  51.         dup2(fd, STDIN_FILENO);  
  52.         dup2(fd, STDOUT_FILENO);  
  53.         dup2(fd, STDERR_FILENO);  
  54.     }  
  55.     if (fd > 2)  
  56.         close(fd);  
  57. }  
  58.   
  59.   
  60. int main(int argc, char* argv[])  
  61. {    
  62.     //设置信号处理  
  63.     signal(SIGCHLD, SIG_DFL);  
  64.     signal(SIGPIPE, SIG_IGN);  
  65.     signal(SIGINT, prog_exit);  
  66.     signal(SIGKILL, prog_exit);  
  67.     signal(SIGTERM, prog_exit);  
  68.       
  69.     short port = 0;  
  70.     int ch;  
  71.     bool bdaemon = false;  
  72.     while ((ch = getopt(argc, argv, "p:d")) != -1)  
  73.     {  
  74.         switch (ch)  
  75.         {  
  76.         case 'd':  
  77.             bdaemon = true;  
  78.             break;  
  79.         case 'p':  
  80.             port = atol(optarg);  
  81.             break;  
  82.         }  
  83.     }  
  84.   
  85.     if (bdaemon)  
  86.         daemon_run();  
  87.   
  88.   
  89.     if (port == 0)  
  90.         port = 12345;  
  91.   
  92.       
  93.     if (!g_reator.init("0.0.0.0", 12345))  
  94.         return -1;  
  95.       
  96.     g_reator.main_loop(&g_reator);  
  97.   
  98.     return 0;  
  99. }  

完整实例代码下载地址:

普通版本:https://pan.baidu.com/s/1o82Mkno

C++11版本:https://pan.baidu.com/s/1dEJdrih


epoll reactor 相关内容

2016-09-08 09:36:19 w451062810 阅读数 1790
/*
 *epoll基于非阻塞I/O事件驱动
 * 反应堆模型 
 */
#include <stdio.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <time.h>


#define MAX_EVENTS  1024                                    //监听上限数
#define BUFLEN 4096                                          //从管道中读取的最大字节数
#define SERV_PORT   8080                                    //默认端口


//读管道
void recvdata(int fd, int events, void *arg);
//写管道
void senddata(int fd, int events, void *arg);


/* 描述就绪文件描述符相关信息 */
struct myevent_s {
    int fd;                                                 //要监听的文件描述符
    int events;                                             //对应的监听事件
    void *arg;                                              //泛型参数
    void (*call_back)(int fd, int events, void *arg);       //回调函数
    int status;                                             //是否在监听:1->在红黑树上(监听), 0->不在(不监听)
    char buf[BUFLEN]; //存放读到的内容
    int len; //读到的长度
    long last_active;                                       //记录每次加入红黑树 g_efd 的时间值
};


int g_efd;                                                  //全局变量, 保存epoll_create返回的文件描述符
struct myevent_s g_events[MAX_EVENTS+1];                    //自定义结构体类型数组. +1-->listen fd(最后一个元素存放监听套接字),方便管理套接字








/* -------------------------------------------*/
/**
* @brief  eventset 
* @function 数组元素的初始化
*
* @ev  数组元素地址
* @fd   要监听的文件描述符
* @call_back 回调函数
* @arg 参数
*
 -------------------------------------------*/


void eventset(struct myevent_s *ev, int fd, void (*call_back)(int, int, void *), void *arg)
{
    ev->fd = fd; 
    ev->call_back = call_back;
    ev->events = 0;//对应的监听事件
    ev->arg = arg; //自己指向自己
    ev->status = 0;//是否在监听:1->在红黑树上(监听), 0->不在(不监听)
    //memset(ev->buf, 0, sizeof(ev->buf));
    //ev->len = 0;
    ev->last_active = time(NULL);                       //调用eventset函数的时间


    return;
}


/* -------------------------------------------*/
/**
* @brief  eventadd 
* @function 事件的添加或修改
*
* @efd  epoll句柄
* @events   要监听或修改的事件
* @ev 数组元素地址
*
 -------------------------------------------*/
void eventadd(int efd, int events, struct myevent_s *ev)
{
    struct epoll_event epv = {0, {0}};//需要监听事件的结构体
    int op; //EPOLL_CTL_MOD(修改树中节点属性),EPOLL_CTL_ADD(向树中插入节点)
    epv.data.ptr = ev;     //指向一个 myevent_s结构体变量
    epv.events = ev->events = events;  //监听事件赋值 ,EPOLLIN 或 EPOLLOUT


    if (ev->status == 1) {           //已经在红黑树 g_efd 里
        op = EPOLL_CTL_MOD;            //修改其属性
    } else {                            //不在红黑树里
        op = EPOLL_CTL_ADD;              //将其加入红黑树 g_efd, 并将status置1
        ev->status = 1;
    }


    if (epoll_ctl(efd, op, ev->fd, &epv) < 0)                      
        printf("event add failed [fd=%d], events[%d]\n", ev->fd, events);
    else
        printf("event add OK [fd=%d], op=%d, events[%0X]\n", ev->fd, op, events);


    return ;
}




/* -------------------------------------------*/
/**
* @brief  eventdel 
* @function 将事件冲epoll中摘除
*
* @efd  epoll句柄
* @ev   数组元素指针
*
 -------------------------------------------*/
void eventdel(int efd, struct myevent_s *ev)
{
    struct epoll_event epv = {0, {0}};


    if (ev->status != 1)                                        //不在红黑树上
        return ;


    epv.data.ptr = ev;
    ev->status = 0;                                             //修改状态
    epoll_ctl(efd, EPOLL_CTL_DEL, ev->fd, &epv);                //从红黑树 efd 上将 ev->fd 摘除


    return ;
}


/* -------------------------------------------*/
/**
* @brief  acceptconn 
* @function 获取客户端套接字,加入eopll树和全局数组中
*
* @lfd  套接字
* @events   当前响应的事件类型
* @arg 指向数组元素的指针
*
 -------------------------------------------*/
void acceptconn(int lfd, int events, void *arg)
{
    struct sockaddr_in cin;
    socklen_t len = sizeof(cin);
    int cfd, i;
    //获取客户端套接字
    if ((cfd = accept(lfd, (struct sockaddr *)&cin, &len)) == -1) {
        if (errno != EAGAIN && errno != EINTR) {
            /* 暂时不做出错处理 */
        }
        printf("%s: accept, %s\n", __func__, strerror(errno));
        return ;
    }


    do {
    //查找自定义数组中的空闲位置
        for (i = 0; i < MAX_EVENTS; i++)                                //从全局数组g_events中找一个空闲元素
            if (g_events[i].status == 0)                                //类似于select中找值为-1的元素
                break;                                                  //跳出 for
        //超出数组的最大限制
        if (i == MAX_EVENTS) {
            printf("%s: max connect limit[%d]\n", __func__, MAX_EVENTS);
            break;                                                      //跳出do while(0) 不执行后续代码
        }
       
        int flag = 0;
        if ((flag = fcntl(cfd, F_SETFL, O_NONBLOCK)) < 0) {             //将cfd也设置为非阻塞
            printf("%s: fcntl nonblocking failed, %s\n", __func__, strerror(errno));
            break;
        }


        //将客户端套接字放入全局数组
        eventset(&g_events[i], cfd, recvdata, &g_events[i]);   
         //将cfd添加到红黑树g_efd中,监听读事件
        eventadd(g_efd, EPOLLIN, &g_events[i]);                        


    } while(0);


    printf("new connect [%s:%d][time:%ld], pos[%d]\n", 
            inet_ntoa(cin.sin_addr), ntohs(cin.sin_port), g_events[i].last_active, i);
    return ;
}


/* -------------------------------------------*/
/**
* @brief  recvdata 
* @function 响应读事件的回调函数,将客户端描述符的监听事件改变
*
* @fd  客户端套接字
* @events   当前响应的事件类型
* @arg 指向数组元素的指针
*
 -------------------------------------------*/
void recvdata(int fd, int events, void *arg)
{
    struct myevent_s *ev = (struct myevent_s *)arg;
    int len;


    //读文件描述符, 数据存入myevent_s成员buf中
    len = recv(fd, ev->buf, sizeof(ev->buf), 0);            


    eventdel(g_efd, ev);        //将该节点从红黑树上摘除


    if (len > 0) {


        ev->len = len;
        ev->buf[len] = '\0';                                //手动添加字符串结束标记
        printf("C[%d]:%s\n", fd, ev->buf);


        //设置该 fd 对应的回调函数为 senddata
        eventset(ev, fd, senddata, ev);                    


        //将fd加入红黑树g_efd中,监听其写事件,当套接字可以写的时候,触发epoll_wait返回
        eventadd(g_efd, EPOLLOUT, ev);                      


    } else if (len == 0) {
        close(ev->fd);
        /* ev-g_events 地址相减得到偏移元素位置 */
        printf("[fd=%d] pos[%ld], closed\n", fd, ev-g_events);
    } else {
        close(ev->fd);
        printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));
    }


    return;
}




/* -------------------------------------------*/
/**
* @brief  senddata 
* @function 向客户端发送信息
*
* @fd  客户端套接字
* @events   当前响应的事件类型
* @arg 指向数组元素的指针
*
 -------------------------------------------*/
void senddata(int fd, int events, void *arg)
{
    struct myevent_s *ev = (struct myevent_s *)arg;
    int len;


    len = send(fd, ev->buf, ev->len, 0);                    //直接将数据 回写给客户端。未作处理
  
    if (len > 0) {


        printf("send[fd=%d], [%d]%s\n", fd, len, ev->buf);
        eventdel(g_efd, ev);                                //从红黑树g_efd中移除
        eventset(ev, fd, recvdata, ev);                     //将该fd的 回调函数改为 recvdata
        eventadd(g_efd, EPOLLIN, ev);                       //从新添加到红黑树上, 设为监听读事件


    } else {
        close(ev->fd);                                      //关闭链接
        eventdel(g_efd, ev);                                //从红黑树g_efd中移除
        printf("send[fd=%d] error %s\n", fd, strerror(errno));
    }


    return ;
}




/* -------------------------------------------*/
/**
* @brief  initlistensocket 
* @function 服务端套接字初始化,将套接字加入epoll树中
*
* @efd  epoll句柄
* @port 待绑定的端口号
*
 -------------------------------------------*/
void initlistensocket(int efd, short port)
{
//创建监听套接字
    int lfd = socket(AF_INET, SOCK_STREAM, 0);
    //设置监听套接字属性为套接字属性为非阻塞
    fcntl(lfd, F_SETFL, O_NONBLOCK);                            


    //结构体最后一个元素初始化,将listenfd信息存放
    eventset(&g_events[MAX_EVENTS], lfd, acceptconn, &g_events[MAX_EVENTS]);


    //将listenfd加入epoll数中
    eventadd(efd, EPOLLIN, &g_events[MAX_EVENTS]); 


    struct sockaddr_in sin;
memset(&sin, 0, sizeof(sin));                                               //bzero(&sin, sizeof(sin))
sin.sin_family = AF_INET; // ipv4 地址
sin.sin_addr.s_addr = htonl(INADDR_ANY); //任意ip
sin.sin_port = htons(port);
     //绑定ip和端口,将主动套接字变为被动连接套接字
bind(lfd, (struct sockaddr *)&sin, sizeof(sin));
//监听,最大等待队列20,创建2个队列,一个是已经完成3次握手的队列,一个是正在等待3次握手完成的队列
listen(lfd, 20);


    return ;
}


int main(int argc, char *argv[])
{
//使用用户指定端口.如未指定,用默认端口
    unsigned short port = SERV_PORT;
    if (argc == 2)
        port = atoi(argv[1]);                         
   
   
    //创建epoll句柄
    g_efd = epoll_create(MAX_EVENTS+1);                 
    if (g_efd <= 0)
        printf("create efd in %s err %s\n", __func__, strerror(errno));


    //服务端套接字初始化,将套接字加入epoll树中
    initlistensocket(g_efd, port);                      


    //创建已经满足就绪事件的文件描述符数组 
  struct epoll_event events[MAX_EVENTS+1];            
printf("server running:port[%d]\n", port);


    int checkpos = 0, i;//一次循环检测100个。 使用checkpos控制检测对象
    while (1) {


        /* 超时验证,每次测试100个链接,不测试listenfd 当客户端60秒内没有和服务器通信,则关闭此客户端链接 */
        long now = time(NULL);                          //当前时间
        for (i = 0; i < 100; i++, checkpos++) {         //一次循环检测100个。 使用checkpos控制检测对象
            if (checkpos == MAX_EVENTS)//过滤listenfd
                checkpos = 0;
            if (g_events[checkpos].status != 1)         //不在红黑树 g_efd 上
                continue;
            long duration = now - g_events[checkpos].last_active;       //客户端不活跃的世间


            if (duration >= 60) {
                close(g_events[checkpos].fd);                           //关闭与该客户端链接
                printf("[fd=%d] timeout\n", g_events[checkpos].fd);
                //g_efd树根  ,结构体地址
                eventdel(g_efd, &g_events[checkpos]);                   //将该客户端 从红黑树 g_efd移除,并将自定义数组中的status置为0
            }
        }
        //关闭连接结束


  


     /* -------------------------------------------*/
/*
* @brief  epoll_wait 
* @function 非阻塞等待1秒钟监控事件的响应,响应成功,将事件结构体放入events中
* @return nfd 代表返回就绪的文件描述符
*
-------------------------------------------*/
        int nfd = epoll_wait(g_efd, events, MAX_EVENTS+1, 1000);
        if (nfd < 0) {
            printf("epoll_wait error, exit\n");
            break;
        }


        for (i = 0; i < nfd; i++) {
            /*使用自定义结构体myevent_s类型指针, 接收 联合体data的void *ptr成员*/
            struct myevent_s *ev = (struct myevent_s *)events[i].data.ptr;  
           
            if ((events[i].events & EPOLLIN) && (ev->events & EPOLLIN)) {           //读就绪事件
            //调用回调函数
                ev->call_back(ev->fd, events[i].events, ev->arg);
            }
            if ((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)) {         //写就绪事件
                ev->call_back(ev->fd, events[i].events, ev->arg);
            }
        }//end for


    }
    /* 退出前释放所有资源 */
    return 0;
}



epoll reactor 相关内容

没有更多推荐了,返回首页