app实现页面跳转 react
2018-01-01 20:11:00 weixin_33692284 阅读数 200

参考来源

实现

  • C++实现
#include<stdio.h>
 
#include<stdlib.h>
 
#include<unistd.h>
 
#include<errno.h>
 
#include<netdb.h>
 
#include<sys/types.h>
 
#include<sys/socket.h>
 
#include<netinet/in.h>
 
#include<arpa/inet.h>
 
#include<netdb.h>
 
#include<sys/time.h>
 
#include<string.h>
 
#include<sys/select.h>
 
#include<pthread.h>
 
 
 
 
/*************************
*用于返回最大文件描述值
*
*************************/
int max_fd(int a[], int num)
{
    int max = -1;
    int i = 0;
    for(i = 0; i < num; i++)
    {
        if(a[i] > max)
        {
            max = a[i];
 
        }
    }
 
    return max;
}
 
int main(int argc, char *argv[])
{
    int sockfd = 0;
    int confd = 0;
    int i = 0, j = 0;
    fd_set fd_read[2];
    int clifd[FD_SETSIZE]; /*存放监听以及已与客户连接的套接字*/
    int fd_count = 0; /*已经连接的套接字个数*/
    struct sockaddr_in seradd, cliadd;
    int fd_ret = 0;
    socklen_t cli_len = sizeof(cliadd);
    char readbuf[1024];
    char writebuf[1024];
    /*create sock;*/
    sockfd = socket(AF_INET, SOCK_STREAM, 0); /*此套接字是阻塞的*/
    if(sockfd < 0)
    {
        perror("sock create fail !!");
        exit(-1);
    }
 
 
    seradd.sin_family = AF_INET;
    seradd.sin_port = htons(8080);
    seradd.sin_addr.s_addr = htonl(INADDR_ANY);
 
    /*bind*/
    if(-1 == (bind(sockfd, (struct sockaddr *)&seradd, sizeof(struct sockaddr))))
    {
        perror("bind error");
        exit(-1);
    }
 
    /*listen*/
    if(-1 == (listen(sockfd, 5)))
    {
        perror("listen error");
        exit(-1);
    }
 
    //memset(&fd_read[0],0,sizeof(fd_read[0]));
 
    FD_ZERO(&fd_read[0]); /*清空fd_read[0]所有位*/
    FD_SET(sockfd, &fd_read[0]); /*将sockfd添加到fd_read[0]描述集中,也就是说将sockfd对应的fd_read[0]位中置位*/
    for(i = 0; i < FD_SETSIZE; i++)
    {
        clifd[i] = -1;
    }
 
    clifd[0] = sockfd;
    printf("--ser start work---\n");
    while(1)
    {
        FD_ZERO(&fd_read[1]);
        fd_read[1] = fd_read[0]; /*每次select后,没有达到条件的描述将被清空,所以每次都需要重新赋值*/
        /*进程将阻塞在select函数处,直到在整个队列中有读描述符可用为止,个人理解--内核检测整个队列,然后将可用的描述符返回(一个或多个,一个没有时将阻塞)*/
        fd_ret = select(max_fd(clifd, FD_SETSIZE) + 1, &fd_read[1], NULL, NULL, NULL); /*我们只关心读描述符集*/
 
        if(fd_ret < 0)
        {
            perror("select error");
        }
        else if(fd_ret > 0)
        {
            /*是监听套接字可读*/
            if(FD_ISSET(sockfd, &fd_read[1]) && (fd_count < FD_SETSIZE - 1))
            {
 
                confd = accept(sockfd, (struct sockaddr *)&cliadd, &cli_len); /*获取与客户端连接套接字*/
                if(-1 == confd)
                {
                    perror("confd error");
                }
 
                for(i = 1; i < FD_SETSIZE; i++)
                {
                    if(clifd[i] == -1)
                    {
                        clifd[i] = confd; /*将获得新连接套接字放到clifd数组中*/
                        FD_SET(confd, &fd_read[0]); /*将获得新连接套接字添加到读描述集中*/
                        fd_count++;
                        break;
                    }
                }
            }
 
            /*连接套接字可读*/
            for(j = 1; j < FD_SETSIZE; j++)
            {
                if(FD_ISSET(clifd[j], &fd_read[1]))
                {
                    /*从clifd[i]套接字中读取数据*/
                    if(read(clifd[j], readbuf, sizeof(readbuf)) <= 0)
                    {
                        perror("read data error");
                        FD_CLR(clifd[j], &fd_read[0]); /*将clifd[j]描述从读描述符集中删除*/
                        close(clifd[j]); /*关闭该套接字*/
                        clifd[j] = -1;
                        fd_count--;
                        continue;
 
                    }
 
                    strcpy(writebuf, readbuf);
                    printf("read data:%s\n", readbuf);
                    if(write(clifd[j], writebuf, sizeof(writebuf)) <= 0)
                    {
                        perror("write data error");
                        FD_CLR(clifd[j], &fd_read[0]);
                        close(clifd[j]);
                        clifd[j] = -1;
                        fd_count--;
                        continue;
                    }
                }
            }
        }
 
 
    }
 
}
  • Java实现
  • Reactor
public class Reactor implements Runnable {

    final Selector selector;
    final ServerSocketChannel serverSocket;

    Reactor(int port) throws IOException {
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(), port);
        serverSocket.socket().bind(address);
        serverSocket.configureBlocking(false);
        //向selector注册该channel
        SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);

        System.out.println("-->Start serverSocket.register!");

        //利用sk的attache功能绑定Acceptor 如果有事情,触发Acceptor
        sk.attach(new Acceptor());
        System.out.println("-->attach(new Acceptor()!");
    }


    public void run() { // normally in a new Thread
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                //Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。
                while (it.hasNext()) {
                    //来一个事件 第一次触发一个accepter线程
                    //以后触发SocketReadHandler
                    dispatch((SelectionKey) (it.next()));
                }
                selected.clear();
            }
        } catch (IOException ex) {
            System.out.println("reactor stop!" + ex);
        }
    }

    //运行Acceptor或SocketReadHandler
    void dispatch(SelectionKey k) {
        Runnable r = (Runnable) (k.attachment());
        if (r != null) {
            r.run();
        }
    }

    class Acceptor implements Runnable { // inner
        public void run() {
            try {
                System.out.println("-->ready for accept!");
                SocketChannel c = serverSocket.accept();
                if (c != null)
                    //调用Handler来处理channel
                    new Handler(selector, c);
            } catch (IOException ex) {
            }
        }
    }
}
  • Handler
public class Handler implements Runnable {

    final SocketChannel socket;
    final SelectionKey sk;
    ByteBuffer input = ByteBuffer.allocate(Integer.MAX_VALUE);
    ByteBuffer output = ByteBuffer.allocate(Integer.MAX_VALUE);
    static final int READING = 0, SENDING = 1;
    int state = READING;


    public Handler(Selector sel, SocketChannel c) throws IOException {
        socket = c;
        //设置为非阻塞模式
        c.configureBlocking(false);
        //此处的0,表示不关注任何时间
        sk = socket.register(sel, 0);
        //将SelectionKey绑定为本Handler 下一步有事件触发时,将调用本类的run方法
        sk.attach(this);
        //将SelectionKey标记为可读,以便读取,不可关注可写事件
        sk.interestOps(SelectionKey.OP_READ);
        sel.wakeup();
    }

    boolean inputIsComplete() {
        return false;
    }

    boolean outputIsComplete() {
        return false;
    }

    //这里可以通过线程池处理数据
    void process() {

    }


    public void run() {
        try {
            if (state == READING) {
                read();
            } else if (state == SENDING) {
                send();
            }
        } catch (IOException ex) { /* ... */ }

    }


    void read() throws IOException {
        socket.read(input);
        if (inputIsComplete()) {
            process();
            state = SENDING;
            // Normally also do first write now
            sk.interestOps(SelectionKey.OP_WRITE);
        }
    }

    void send() throws IOException {
        socket.write(output);
        if (outputIsComplete()) {
            //
            sk.cancel();
        }
    }

}
2018-08-04 09:36:09 qq_42606051 阅读数 694

NIO 常用的编程模型是 Reactor,在 Doug Lea 的 Scalable IO in Java 的 PPT 中对其进行了介绍,文末有福利 :) ,Reactor 的特点是 I/O 多路复用和事件驱动,基本处理过程为:

  • 处理程序声明感兴趣的 I/O 事件,这些事件表示在特定套接字上准备读取的情况
  • 事件通知器等待事件
  • 一个事件发生并唤醒通知器,通知器调用适当的处理程序
  • 事件处理程序执行实际的读取操作,并进行处理,然后重新声明关注的 I/O 事件,并将控制权返回给调度程序

其中通知器,就是 Selector。Reactor模型主要有以下几种版本:

  • 单线程Reactor,单线程处理器
  • 单线程Reactor,多线程处理器
  • 多线程主从Reactor,单线程处理器
  • 多线程主从Reactor,多线程处理器

单线程版本

单线程版本

核心代码:

 public void run() {
    try { 
      if      (state == READING) read(); 
      else if (state == SENDING) send();
    } catch (IOException ex) { /* ... */ }
  }
  void read() throws IOException {
    socket.read(input);
    if (inputIsComplete()) {
       process(); 
       state = SENDING; 
       // Normally also do first write now
       sk.interestOps(SelectionKey.OP_WRITE);
    }
  }
  void send() throws IOException {
    socket.write(output);
    if (outputIsComplete()) sk.cancel();
  } 

单线程的特点是:只有一个 Reactor 线程,即只有一个 Selector 事件通知器,也就是说,字节的读取 I/O 和后续的业务处理(process() 方法),均由 Reactor 线程来做,很显然业务的处理影响后续事件的分发,所以引出多线程版本进行优化。

多线程版本

多线程版本

核心代码:

static PooledExecutor pool = new PooledExecutor(...);
static final int PROCESSING = 3;
// ...
synchronized void read() { // ...
  socket.read(input);
  if (inputIsComplete()) {
    state = PROCESSING;
    pool.execute(new Processer());
  }
}
synchronized void processAndHandOff() {
  process();
  state = SENDING; // or rebind attachment
  sk.interest(SelectionKey.OP_WRITE);
}
class Processer implements Runnable {
  public void run() { processAndHandOff(); }
}

多线程版本的特点是:一个 Reactor 线程和多个处理线程,将业务处理(process 交给线程池)进行了分离,Reactor 线程,只关注事件分发和字节的发送和读取(I/O)。注意,实际的发送和读取还是由 Reactor 处理,那么在高并发下,有可能连接来不及接收,继续优化,采用主从 Reactor。

主从 Reactor

主从 Reactor

核心代码:

Selector[] selectors; // also create threads
int next = 0;
class Acceptor { // ... 
  public synchronized void run() { ...
    Socket connection = serverSocket.accept();
    if (connection != null)
      new Handler(selectors[next], connection);
    if (++next == selectors.length) next = 0;
  }
}

主从 Reactor 特点是:使用一个 Selector 池,通常有一个 主Reactor 用于处理接收连接事件,多个 从Reactor 处理实际的 I/O,整体来看,分工合作,分而治之,非常高效。

在真正实现时,有些细节需要注意,完整代码下载https://github.com/rmwheel/reactor
代码有详细注释,看完绝对能理解 Reactor,其中包含对 Doug Lea 的 Scalable IO in Java 的 翻译,欢迎 star :)

本文由 wskwbog 创作,采用 知识共享4.0 许可证 - 署名-非商业性使用-禁止演绎
本站文章除注明转载/出处外,均为本站原创或翻译
微信公众号:拆轮子 欢迎关注.

郑州检查男科疾病去哪家医院

郑州妇科在线咨询

郑州看男科多少钱

郑州不孕不育正规医院

2019-07-14 18:05:11 liuyuan185442111 阅读数 11

下面是一个reactor模式下的io简单类图:

reactor::run是一个死循环,单独占用一个线程,循环体内容是:
调用load_event(),加载自上次以来变化的事件,函数会调用_dispatch的add_event()和del_event(),将要监听的fd和该fd上关心的事件告知demultiplexer;
调用_dispatch->dispatch()进入wait状态,等待事件发生或超时。

passiveio_event和activeio_event的作用是建立tcp连接,即创建streamio_event对象,然后将其加入reactor。
在已建立的连接上有事件发生时,找到其对应的streamio_event对象,根据是读还是写调用handle_read()或handle_write()。
在handle_read()或handle_write()内部,则委托给session *_ses负责具体的逻辑。

一个session对应一个连接,它的内部可能会有接收和发送缓冲,加解密等一些组件;session_manager组织了多个session,调用session_manager的send时,需要指定一个sid,send会找到对应session,session在做了一些工作后,向reactor添加可以发送的事件,这会唤醒demultiplexer::dispatch(),然后去发送数据。

reactor包含demuliplexer和多个event,但调用demuliplexer::dispatch()时又会把reactor传过去,event内部也有指向reactor的指针,这三个类耦合比较紧密,但好在它们整体对外解耦了。
session内部有指向session_manager的指针,session_manager管理着session,在我看来这样实在不好。在看wdb代码时,page类内部有指向pagecache类的指针,我通过把b树的merge、split等操作从page移到pagecache,使得page类不再需要使用pagecache的方法了,但这里却是不好将session和session_manager解耦。

2017-04-09 22:41:56 rankun1 阅读数 3765

转自:http://blog.csdn.net/analogous_love/article/details/53319815

最近一直在看游双的《高性能Linux服务器编程》一书,下载链接: http://download.csdn.net/detail/analogous_love/9673008

书上是这么介绍Reactor模式的:




按照这个思路,我写个简单的练习:

[cpp] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. /**  
  2.  *@desc:   用reactor模式练习服务器程序,main.cpp 
  3.  *@author: zhangyl 
  4.  *@date:   2016.11.23 
  5.  */  
  6.   
  7. #include <iostream>  
  8. #include <string.h>  
  9. #include <sys/types.h>  
  10. #include <sys/socket.h>  
  11. #include <netinet/in.h>  
  12. #include <arpa/inet.h>  //for htonl() and htons()  
  13. #include <unistd.h>  
  14. #include <fcntl.h>  
  15. #include <sys/epoll.h>  
  16. #include <signal.h>     //for signal()  
  17. #include <pthread.h>  
  18. #include <semaphore.h>  
  19. #include <list>  
  20. #include <errno.h>  
  21. #include <time.h>  
  22. #include <sstream>  
  23. #include <iomanip> //for std::setw()/setfill()  
  24. #include <stdlib.h>  
  25.   
  26.   
  27. #define WORKER_THREAD_NUM   5  
  28.   
  29. #define min(a, b) ((a <= b) ? (a) : (b))   
  30.   
  31. int g_epollfd = 0;  
  32. bool g_bStop = false;  
  33. int g_listenfd = 0;  
  34. pthread_t g_acceptthreadid = 0;  
  35. pthread_t g_threadid[WORKER_THREAD_NUM] = { 0 };  
  36. pthread_cond_t g_acceptcond;  
  37. pthread_mutex_t g_acceptmutex;  
  38.   
  39. pthread_cond_t g_cond /*= PTHREAD_COND_INITIALIZER*/;  
  40. pthread_mutex_t g_mutex /*= PTHREAD_MUTEX_INITIALIZER*/;  
  41.   
  42. pthread_mutex_t g_clientmutex;  
  43.   
  44. std::list<int> g_listClients;  
  45.   
  46. void prog_exit(int signo)  
  47. {  
  48.     ::signal(SIGINT, SIG_IGN);  
  49.     ::signal(SIGKILL, SIG_IGN);  
  50.     ::signal(SIGTERM, SIG_IGN);  
  51.   
  52.     std::cout << "program recv signal " << signo << " to exit." << std::endl;  
  53.   
  54.     g_bStop = true;  
  55.   
  56.     ::epoll_ctl(g_epollfd, EPOLL_CTL_DEL, g_listenfd, NULL);  
  57.   
  58.     //TODO: 是否需要先调用shutdown()一下?  
  59.     ::shutdown(g_listenfd, SHUT_RDWR);  
  60.     ::close(g_listenfd);  
  61.     ::close(g_epollfd);  
  62.   
  63.     ::pthread_cond_destroy(&g_acceptcond);  
  64.     ::pthread_mutex_destroy(&g_acceptmutex);  
  65.       
  66.     ::pthread_cond_destroy(&g_cond);  
  67.     ::pthread_mutex_destroy(&g_mutex);  
  68.   
  69.     ::pthread_mutex_destroy(&g_clientmutex);  
  70. }  
  71.   
  72. bool create_server_listener(const char* ip, short port)  
  73. {  
  74.     g_listenfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);  
  75.     if (g_listenfd == -1)  
  76.         return false;  
  77.   
  78.     int on = 1;  
  79.     ::setsockopt(g_listenfd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on));  
  80.     ::setsockopt(g_listenfd, SOL_SOCKET, SO_REUSEPORT, (char *)&on, sizeof(on));  
  81.   
  82.     struct sockaddr_in servaddr;  
  83.     memset(&servaddr, 0, sizeof(servaddr));   
  84.     servaddr.sin_family = AF_INET;  
  85.     servaddr.sin_addr.s_addr = inet_addr(ip);  
  86.     servaddr.sin_port = htons(port);  
  87.     if (::bind(g_listenfd, (sockaddr *)&servaddr, sizeof(servaddr)) == -1)  
  88.         return false;  
  89.   
  90.     if (::listen(g_listenfd, 50) == -1)  
  91.         return false;  
  92.   
  93.     g_epollfd = ::epoll_create(1);  
  94.     if (g_epollfd == -1)  
  95.         return false;  
  96.   
  97.     struct epoll_event e;  
  98.     memset(&e, 0, sizeof(e));  
  99.     e.events = EPOLLIN | EPOLLRDHUP;  
  100.     e.data.fd = g_listenfd;  
  101.     if (::epoll_ctl(g_epollfd, EPOLL_CTL_ADD, g_listenfd, &e) == -1)  
  102.         return false;  
  103.   
  104.     return true;  
  105. }  
  106.   
  107. void release_client(int clientfd)  
  108. {  
  109.     if (::epoll_ctl(g_epollfd, EPOLL_CTL_DEL, clientfd, NULL) == -1)  
  110.         std::cout << "release client socket failed as call epoll_ctl failed" << std::endl;  
  111.   
  112.     ::close(clientfd);  
  113. }  
  114.   
  115. void* accept_thread_func(void* arg)  
  116. {     
  117.     while (!g_bStop)  
  118.     {  
  119.         ::pthread_mutex_lock(&g_acceptmutex);  
  120.         ::pthread_cond_wait(&g_acceptcond, &g_acceptmutex);  
  121.         //::pthread_mutex_lock(&g_acceptmutex);  
  122.   
  123.         //std::cout << "run loop in accept_thread_func" << std::endl;  
  124.   
  125.         struct sockaddr_in clientaddr;  
  126.         socklen_t addrlen;  
  127.         int newfd = ::accept(g_listenfd, (struct sockaddr *)&clientaddr, &addrlen);  
  128.         ::pthread_mutex_unlock(&g_acceptmutex);  
  129.         if (newfd == -1)  
  130.             continue;  
  131.   
  132.         std::cout << "new client connected: " << ::inet_ntoa(clientaddr.sin_addr) << ":" << ::ntohs(clientaddr.sin_port) << std::endl;  
  133.   
  134.         //将新socket设置为non-blocking  
  135.         int oldflag = ::fcntl(newfd, F_GETFL, 0);  
  136.         int newflag = oldflag | O_NONBLOCK;  
  137.         if (::fcntl(newfd, F_SETFL, newflag) == -1)  
  138.         {  
  139.             std::cout << "fcntl error, oldflag =" << oldflag << ", newflag = " << newflag << std::endl;  
  140.             continue;  
  141.         }  
  142.   
  143.         struct epoll_event e;  
  144.         memset(&e, 0, sizeof(e));  
  145.         e.events = EPOLLIN | EPOLLRDHUP | EPOLLET;  
  146.         e.data.fd = newfd;  
  147.         if (::epoll_ctl(g_epollfd, EPOLL_CTL_ADD, newfd, &e) == -1)  
  148.         {  
  149.             std::cout << "epoll_ctl error, fd =" << newfd << std::endl;  
  150.         }  
  151.     }  
  152.   
  153.     return NULL;  
  154. }  
  155.   
  156.   
  157. void* worker_thread_func(void* arg)  
  158. {     
  159.     while (!g_bStop)  
  160.     {  
  161.         int clientfd;  
  162.         ::pthread_mutex_lock(&g_clientmutex);  
  163.         while (g_listClients.empty())  
  164.             ::pthread_cond_wait(&g_cond, &g_clientmutex);  
  165.         clientfd = g_listClients.front();  
  166.         g_listClients.pop_front();    
  167.         pthread_mutex_unlock(&g_clientmutex);  
  168.   
  169.         //gdb调试时不能实时刷新标准输出,用这个函数刷新标准输出,使信息在屏幕上实时显示出来  
  170.         std::cout << std::endl;  
  171.   
  172.         std::string strclientmsg;  
  173.         char buff[256];  
  174.         bool bError = false;  
  175.         while (true)  
  176.         {  
  177.             memset(buff, 0, sizeof(buff));  
  178.             int nRecv = ::recv(clientfd, buff, 256, 0);  
  179.             if (nRecv == -1)  
  180.             {  
  181.                 if (errno == EWOULDBLOCK)  
  182.                     break;  
  183.                 else  
  184.                 {  
  185.                     std::cout << "recv error, client disconnected, fd = " << clientfd << std::endl;  
  186.                     release_client(clientfd);  
  187.                     bError = true;  
  188.                     break;  
  189.                 }  
  190.                       
  191.             }  
  192.             //对端关闭了socket,这端也关闭。  
  193.             else if (nRecv == 0)  
  194.             {  
  195.                 std::cout << "peer closed, client disconnected, fd = " << clientfd << std::endl;  
  196.                 release_client(clientfd);  
  197.                 bError = true;  
  198.                 break;  
  199.             }  
  200.   
  201.             strclientmsg += buff;  
  202.         }  
  203.   
  204.         //出错了,就不要再继续往下执行了  
  205.         if (bError)  
  206.             continue;  
  207.           
  208.         std::cout << "client msg: " << strclientmsg;  
  209.   
  210.         //将消息加上时间标签后发回  
  211.         time_t now = time(NULL);  
  212.         struct tm* nowstr = localtime(&now);  
  213.         std::ostringstream ostimestr;  
  214.         ostimestr << "[" << nowstr->tm_year + 1900 << "-"   
  215.                   << std::setw(2) << std::setfill('0') << nowstr->tm_mon + 1 << "-"   
  216.                   << std::setw(2) << std::setfill('0') << nowstr->tm_mday << " "  
  217.                   << std::setw(2) << std::setfill('0') << nowstr->tm_hour << ":"   
  218.                   << std::setw(2) << std::setfill('0') << nowstr->tm_min << ":"   
  219.                   << std::setw(2) << std::setfill('0') << nowstr->tm_sec << "]server reply: ";  
  220.   
  221.         strclientmsg.insert(0, ostimestr.str());  
  222.           
  223.         while (true)  
  224.         {  
  225.             int nSent = ::send(clientfd, strclientmsg.c_str(), strclientmsg.length(), 0);  
  226.             if (nSent == -1)  
  227.             {  
  228.                 if (errno == EWOULDBLOCK)  
  229.                 {  
  230.                     ::sleep(10);  
  231.                     continue;  
  232.                 }  
  233.                 else  
  234.                 {  
  235.                     std::cout << "send error, fd = " << clientfd << std::endl;  
  236.                     release_client(clientfd);  
  237.                     break;  
  238.                 }  
  239.                      
  240.             }            
  241.   
  242.             std::cout << "send: " << strclientmsg;  
  243.             strclientmsg.erase(0, nSent);  
  244.   
  245.             if (strclientmsg.empty())  
  246.                 break;  
  247.         }  
  248.     }  
  249.   
  250.     return NULL;  
  251. }  
  252.   
  253. void daemon_run()  
  254. {  
  255.     int pid;  
  256.     signal(SIGCHLD, SIG_IGN);  
  257.     //1)在父进程中,fork返回新创建子进程的进程ID;  
  258.     //2)在子进程中,fork返回0;  
  259.     //3)如果出现错误,fork返回一个负值;  
  260.     pid = fork();  
  261.     if (pid < 0)  
  262.     {  
  263.         std:: cout << "fork error" << std::endl;  
  264.         exit(-1);  
  265.     }  
  266.     //父进程退出,子进程独立运行  
  267.     else if (pid > 0) {  
  268.         exit(0);  
  269.     }  
  270.     //之前parent和child运行在同一个session里,parent是会话(session)的领头进程,  
  271.     //parent进程作为会话的领头进程,如果exit结束执行的话,那么子进程会成为孤儿进程,并被init收养。  
  272.     //执行setsid()之后,child将重新获得一个新的会话(session)id。  
  273.     //这时parent退出之后,将不会影响到child了。  
  274.     setsid();  
  275.     int fd;  
  276.     fd = open("/dev/null", O_RDWR, 0);  
  277.     if (fd != -1)  
  278.     {  
  279.         dup2(fd, STDIN_FILENO);  
  280.         dup2(fd, STDOUT_FILENO);  
  281.         dup2(fd, STDERR_FILENO);  
  282.     }  
  283.     if (fd > 2)  
  284.         close(fd);  
  285.    
  286. }  
  287.   
  288.   
  289. int main(int argc, char* argv[])  
  290. {    
  291.     short port = 0;  
  292.     int ch;  
  293.     bool bdaemon = false;  
  294.     while ((ch = getopt(argc, argv, "p:d")) != -1)  
  295.     {  
  296.         switch (ch)  
  297.         {  
  298.         case 'd':  
  299.             bdaemon = true;  
  300.             break;  
  301.         case 'p':  
  302.             port = atol(optarg);  
  303.             break;  
  304.         }  
  305.     }  
  306.   
  307.     if (bdaemon)  
  308.         daemon_run();  
  309.   
  310.   
  311.     if (port == 0)  
  312.         port = 12345;  
  313.        
  314.     if (!create_server_listener("0.0.0.0", port))  
  315.     {  
  316.         std::cout << "Unable to create listen server: ip=0.0.0.0, port=" << port << "." << std::endl;  
  317.         return -1;  
  318.     }  
  319.   
  320.       
  321.     //设置信号处理  
  322.     signal(SIGCHLD, SIG_DFL);  
  323.     signal(SIGPIPE, SIG_IGN);  
  324.     signal(SIGINT, prog_exit);  
  325.     signal(SIGKILL, prog_exit);  
  326.     signal(SIGTERM, prog_exit);  
  327.   
  328.     ::pthread_cond_init(&g_acceptcond, NULL);  
  329.     ::pthread_mutex_init(&g_acceptmutex, NULL);  
  330.   
  331.     ::pthread_cond_init(&g_cond, NULL);  
  332.     ::pthread_mutex_init(&g_mutex, NULL);  
  333.   
  334.     ::pthread_mutex_init(&g_clientmutex, NULL);  
  335.        
  336.     ::pthread_create(&g_acceptthreadid, NULL, accept_thread_func, NULL);  
  337.     //启动工作线程  
  338.     for (int i = 0; i < WORKER_THREAD_NUM; ++i)  
  339.     {  
  340.         ::pthread_create(&g_threadid[i], NULL, worker_thread_func, NULL);  
  341.     }  
  342.   
  343.     while (!g_bStop)  
  344.     {         
  345.         struct epoll_event ev[1024];  
  346.         int n = ::epoll_wait(g_epollfd, ev, 1024, 10);  
  347.         if (n == 0)  
  348.             continue;  
  349.         else if (n < 0)  
  350.         {  
  351.             std::cout << "epoll_wait error" << std::endl;  
  352.             continue;  
  353.         }  
  354.   
  355.         int m = min(n, 1024);  
  356.         for (int i = 0; i < m; ++i)  
  357.         {  
  358.             //通知接收连接线程接收新连接  
  359.             if (ev[i].data.fd == g_listenfd)  
  360.                 pthread_cond_signal(&g_acceptcond);  
  361.             //通知普通工作线程接收数据  
  362.             else  
  363.             {                 
  364.                 pthread_mutex_lock(&g_clientmutex);                
  365.                 g_listClients.push_back(ev[i].data.fd);  
  366.                 pthread_mutex_unlock(&g_clientmutex);  
  367.                 pthread_cond_signal(&g_cond);  
  368.                 //std::cout << "signal" << std::endl;  
  369.             }  
  370.                   
  371.         }  
  372.   
  373.     }  
  374.       
  375.     return 0;  
  376. }  


程序的功能一个简单的echo服务:客户端连接上服务器之后,给服务器发送信息,服务器加上时间戳等信息后返回给客户端。使用到的知识点有:

1. 条件变量

2.epoll的边缘触发模式


程序的大致框架是:

1. 主线程只负责监听侦听socket上是否有新连接,如果有新连接到来,交给一个叫accept的工作线程去接收新连接,并将新连接socket绑定到主线程使用epollfd上去。

2. 主线程如果侦听到客户端的socket上有可读事件,则通知另外五个工作线程去接收处理客户端发来的数据,并将数据加上时间戳后发回给客户端。

3. 可以通过传递-p port来设置程序的监听端口号;可以通过传递-d来使程序以daemon模式运行在后台。这也是标准linux daemon模式的书写方法。


程序难点和需要注意的地方是:

1. 条件变量为了防止虚假唤醒,一定要在一个循环里面调用pthread_cond_wait()函数,我在worker_thread_func()中使用了:

[cpp] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. while (g_listClients.empty())  
  2.             ::pthread_cond_wait(&g_cond, &g_clientmutex);  


在accept_thread_func()函数里面我没有使用循环,这样会有问题吗?

2. 使用条件变量pthread_cond_wait()函数的时候一定要先获得与该条件变量相关的mutex,即像下面这样的结构:

[cpp] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. mutex_lock(...);  
  2.   
  3. while (condition is true)  
  4.     ::pthread_cond_wait(...);  
  5.   
  6. //这里可以有其他代码...  
  7. mutex_unlock(...);  
  8.   
  9. //这里可以有其他代码...  

因为pthread_cond_wait()如果阻塞的话,它解锁相关mutex和阻塞当前线程这两个动作加在一起是原子的。


3. 作为服务器端程序最好对侦听socket调用setsocketopt()设置SO_REUSEADDR和SO_REUSEPORT两个标志,因为服务程序有时候会需要重启(比如调试的时候就会不断重启),如果不设置这两个标志的话,绑定端口时就会调用失败。因为一个端口使用后,即使不再使用,因为四次挥手该端口处于TIME_WAIT状态,有大约2min的MSL(Maximum Segment Lifetime,最大存活期)。这2min内,该端口是不能被重复使用的。你的服务器程序上次使用了这个端口号,接着重启,因为这个缘故,你再次绑定这个端口就会失败(bind函数调用失败)。要不你就每次重启时需要等待2min后再试(这在频繁重启程序调试是难以接收的),或者设置这种SO_REUSEADDR和SO_REUSEPORT立即回收端口使用。

其实,SO_REUSEADDR在windows上和Unix平台上还有些细微的区别,我在libevent源码中看到这样的描述:

[cpp] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. int evutil_make_listen_socket_reuseable(evutil_socket_t sock)  
  2. {  
  3. #ifndef WIN32  
  4.     int one = 1;  
  5.     /* REUSEADDR on Unix means, "don't hang on to this address after the 
  6.      * listener is closed."  On Windows, though, it means "don't keep other 
  7.      * processes from binding to this address while we're using it. */  
  8.     return setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void*) &one,  
  9.         (ev_socklen_t)sizeof(one));  
  10. #else  
  11.     return 0;  
  12. #endif  
  13. }  
注意注释部分,在Unix平台上设置这个选项意味着,任意进程可以复用该地址;而在windows,不要阻止其他进程复用该地址。也就是在在Unix平台上,如果不设置这个选项,任意进程在一定时间内,不能bind该地址;在windows平台上,在一定时间内,其他进程不能bind该地址,而本进程却可以再次bind该地址。


4. epoll_wait对新连接socket使用的是边缘触发模式EPOLLET(edge trigger),而不是默认的水平触发模式(level trigger)。因为如果采取水平触发模式的话,主线程检测到某个客户端socket数据可读时,通知工作线程去收取该socket上的数据,这个时候主线程继续循环,只要在工作线程没有将该socket上数据全部收完,或者在工作线程收取数据的过程中,客户端有新数据到来,主线程会继续发通知(通过pthread_cond_signal())函数,再次通知工作线程收取数据。这样会可能导致多个工作线程同时调用recv函数收取该客户端socket上的数据,这样产生的结果将会导致数据错乱。

      相反,采取边缘触发模式,只有等某个工作线程将那个客户端socket上数据全部收取完毕,主线程的epoll_wait才可能会再次触发来通知工作线程继续收取那个客户端socket新来的数据。


5. 代码中有这样一行:

[cpp] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. //gdb调试时不能实时刷新标准输出,用这个函数刷新标准输出,使信息在屏幕上实时显示出来  
  2.         std::cout << std::endl;  

如果不加上这一行,正常运行服务器程序,程序中要打印到控制台的信息都会打印出来,但是如果用gdb调试状态下,程序的所有输出就不显示了。我不知道这是不是gdb的一个bug,所以这里加上std::endl来输出一个换行符并flush标准输出,让输出显示出来。(std::endl不仅是输出一个换行符而且是同时刷新输出,相当于fflush()函数)。


程序我部署起来了,你可以使用linux的nc命令或自己写程序连接服务器来查看程序效果,当然也可以使用telnet命令,方法:

linux:

nc 120.55.94.78 12345

telnet 120.55.94.78 12345

然后就可以给服务器自由发送数据了,服务器会给你发送的信息加上时间戳返回给你。效果如图:



另外我将这个代码改写了成纯C++11版本,使用CMake编译,为了支持编译必须加上这-std=c++11:

CMakeLists.txt代码如下:

[cpp] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. cmake_minimum_required(VERSION 2.8)  
  2.   
  3. PROJECT(myreactorserver)  
  4.   
  5. AUX_SOURCE_DIRECTORY(./ SRC_LIST)  
  6. SET(EXECUTABLE_OUTPUT_PATH ./)  
  7.   
  8. ADD_DEFINITIONS(-g -W -Wall -Wno-deprecated -DLINUX -D_REENTRANT -D_FILE_OFFSET_BITS=64 -DAC_HAS_INFO -DAC_HAS_WARNING -DAC_HAS_ERROR -DAC_HAS_CRITICAL -DTIXML_USE_STL -DHAVE_CXX_STDHEADERS ${CMAKE_CXX_FLAGS} -std=c++11)  
  9.   
  10. INCLUDE_DIRECTORIES(  
  11. ./  
  12. )  
  13. LINK_DIRECTORIES(  
  14. ./  
  15. )  
  16.   
  17. set(  
  18. main.cpp  
  19. myreator.cpp  
  20. )  
  21.   
  22. ADD_EXECUTABLE(myreactorserver ${SRC_LIST})  
  23.   
  24. TARGET_LINK_LIBRARIES(myreactorserver pthread)  

myreactor.h文件内容:

[cpp] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. /** 
  2. *@desc: myreactor头文件, myreactor.h 
  3. *@author: zhangyl 
  4. *@date: 2016.12.03 
  5. */  
  6. #ifndef __MYREACTOR_H__  
  7. #define __MYREACTOR_H__  
  8.   
  9. #include <list>  
  10. #include <memory>  
  11. #include <thread>  
  12. #include <mutex>  
  13. #include <condition_variable>  
  14.   
  15. #define WORKER_THREAD_NUM   5  
  16.   
  17. class CMyReactor  
  18. {  
  19. public:  
  20.     CMyReactor();  
  21.     ~CMyReactor();  
  22.   
  23.     bool init(const char* ip, short nport);  
  24.     bool uninit();  
  25.   
  26.     bool close_client(int clientfd);  
  27.   
  28.     static void* main_loop(void* p);  
  29.   
  30. private:  
  31.     //no copyable  
  32.     CMyReactor(const CMyReactor& rhs);  
  33.     CMyReactor& operator = (const CMyReactor& rhs);  
  34.   
  35.     bool create_server_listener(const char* ip, short port);  
  36.       
  37.     static void accept_thread_proc(CMyReactor* pReatcor);  
  38.     static void worker_thread_proc(CMyReactor* pReatcor);  
  39.   
  40. private:  
  41.     //C11语法可以在这里初始化  
  42.     int                          m_listenfd = 0;  
  43.     int                          m_epollfd  = 0;  
  44.     bool                         m_bStop    = false;  
  45.       
  46.     std::shared_ptr<std::thread> m_acceptthread;  
  47.     std::shared_ptr<std::thread> m_workerthreads[WORKER_THREAD_NUM];  
  48.       
  49.     std::condition_variable      m_acceptcond;  
  50.     std::mutex                   m_acceptmutex;  
  51.   
  52.     std::condition_variable      m_workercond ;  
  53.     std::mutex                   m_workermutex;  
  54.   
  55.     std::list<int>                 m_listClients;  
  56. };  
  57.   
  58. #endif //!__MYREACTOR_H__  

myreactor.cpp文件内容:

[cpp] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. /**  
  2.  *@desc: myreactor实现文件, myreactor.cpp 
  3.  *@author: zhangyl 
  4.  *@date: 2016.12.03 
  5.  */  
  6. #include "myreactor.h"  
  7. #include <iostream>  
  8. #include <string.h>  
  9. #include <sys/types.h>  
  10. #include <sys/socket.h>  
  11. #include <netinet/in.h>  
  12. #include <arpa/inet.h>  //for htonl() and htons()  
  13. #include <fcntl.h>  
  14. #include <sys/epoll.h>  
  15. #include <list>  
  16. #include <errno.h>  
  17. #include <time.h>  
  18. #include <sstream>  
  19. #include <iomanip> //for std::setw()/setfill()  
  20. #include <unistd.h>  
  21.   
  22. #define min(a, b) ((a <= b) ? (a) : (b))  
  23.   
  24. CMyReactor::CMyReactor()  
  25. {  
  26.     //m_listenfd = 0;  
  27.     //m_epollfd = 0;  
  28.     //m_bStop = false;  
  29. }  
  30.   
  31. CMyReactor::~CMyReactor()  
  32. {  
  33.   
  34. }  
  35.   
  36. bool CMyReactor::init(const char* ip, short nport)  
  37. {  
  38.     if (!create_server_listener(ip, nport))  
  39.     {  
  40.         std::cout << "Unable to bind: " << ip << ":" << nport << "." << std::endl;  
  41.         return false;  
  42.     }  
  43.   
  44.   
  45.     std::cout << "main thread id = " << std::this_thread::get_id() << std::endl;  
  46.   
  47.     //启动接收新连接的线程  
  48.     m_acceptthread.reset(new std::thread(CMyReactor::accept_thread_proc, this));  
  49.       
  50.     //启动工作线程  
  51.     for (auto& t : m_workerthreads)  
  52.     {  
  53.         t.reset(new std::thread(CMyReactor::worker_thread_proc, this));  
  54.     }  
  55.   
  56.   
  57.     return true;  
  58. }  
  59.   
  60. bool CMyReactor::uninit()  
  61. {  
  62.     m_bStop = true;  
  63.     m_acceptcond.notify_one();  
  64.     m_workercond.notify_all();  
  65.   
  66.     m_acceptthread->join();  
  67.     for (auto& t : m_workerthreads)  
  68.     {  
  69.         t->join();  
  70.     }  
  71.   
  72.     ::epoll_ctl(m_epollfd, EPOLL_CTL_DEL, m_listenfd, NULL);  
  73.   
  74.     //TODO: 是否需要先调用shutdown()一下?  
  75.     ::shutdown(m_listenfd, SHUT_RDWR);  
  76.     ::close(m_listenfd);  
  77.     ::close(m_epollfd);  
  78.   
  79.     return true;  
  80. }  
  81.   
  82. bool CMyReactor::close_client(int clientfd)  
  83. {  
  84.     if (::epoll_ctl(m_epollfd, EPOLL_CTL_DEL, clientfd, NULL) == -1)  
  85.     {  
  86.         std::cout << "close client socket failed as call epoll_ctl failed" << std::endl;  
  87.         //return false;  
  88.     }  
  89.           
  90.   
  91.     ::close(clientfd);  
  92.   
  93.     return true;  
  94. }  
  95.   
  96.   
  97. void* CMyReactor::main_loop(void* p)  
  98. {  
  99.     std::cout << "main thread id = " << std::this_thread::get_id() << std::endl;  
  100.       
  101.     CMyReactor* pReatcor = static_cast<CMyReactor*>(p);  
  102.       
  103.     while (!pReatcor->m_bStop)  
  104.     {  
  105.         struct epoll_event ev[1024];  
  106.         int n = ::epoll_wait(pReatcor->m_epollfd, ev, 1024, 10);  
  107.         if (n == 0)  
  108.             continue;  
  109.         else if (n < 0)  
  110.         {  
  111.             std::cout << "epoll_wait error" << std::endl;  
  112.             continue;  
  113.         }  
  114.   
  115.         int m = min(n, 1024);  
  116.         for (int i = 0; i < m; ++i)  
  117.         {  
  118.             //通知接收连接线程接收新连接  
  119.             if (ev[i].data.fd == pReatcor->m_listenfd)  
  120.                 pReatcor->m_acceptcond.notify_one();  
  121.             //通知普通工作线程接收数据  
  122.             else  
  123.             {  
  124.                 {  
  125.                     std::unique_lock<std::mutex> guard(pReatcor->m_workermutex);  
  126.                     pReatcor->m_listClients.push_back(ev[i].data.fd);  
  127.                 }  
  128.                                   
  129.                 pReatcor->m_workercond.notify_one();  
  130.                 //std::cout << "signal" << std::endl;  
  131.             }// end if  
  132.   
  133.         }// end for-loop  
  134.     }// end while  
  135.   
  136.     std::cout << "main loop exit ..." << std::endl;  
  137.   
  138.     return NULL;  
  139. }  
  140.   
  141. void CMyReactor::accept_thread_proc(CMyReactor* pReatcor)  
  142. {  
  143.     std::cout << "accept thread, thread id = " << std::this_thread::get_id() << std::endl;  
  144.   
  145.     while (true)  
  146.     {  
  147.         int newfd;  
  148.         struct sockaddr_in clientaddr;  
  149.         socklen_t addrlen;  
  150.         {  
  151.             std::unique_lock<std::mutex> guard(pReatcor->m_acceptmutex);  
  152.             pReatcor->m_acceptcond.wait(guard);  
  153.             if (pReatcor->m_bStop)  
  154.                 break;  
  155.   
  156.             //std::cout << "run loop in accept_thread_proc" << std::endl;  
  157.               
  158.             newfd = ::accept(pReatcor->m_listenfd, (struct sockaddr *)&clientaddr, &addrlen);  
  159.         }  
  160.         if (newfd == -1)  
  161.             continue;  
  162.   
  163.         std::cout << "new client connected: " << ::inet_ntoa(clientaddr.sin_addr) << ":" << ::ntohs(clientaddr.sin_port) << std::endl;  
  164.   
  165.         //将新socket设置为non-blocking  
  166.         int oldflag = ::fcntl(newfd, F_GETFL, 0);  
  167.         int newflag = oldflag | O_NONBLOCK;  
  168.         if (::fcntl(newfd, F_SETFL, newflag) == -1)  
  169.         {  
  170.             std::cout << "fcntl error, oldflag =" << oldflag << ", newflag = " << newflag << std::endl;  
  171.             continue;  
  172.         }  
  173.   
  174.         struct epoll_event e;  
  175.         memset(&e, 0, sizeof(e));  
  176.         e.events = EPOLLIN | EPOLLRDHUP | EPOLLET;  
  177.         e.data.fd = newfd;  
  178.         if (::epoll_ctl(pReatcor->m_epollfd, EPOLL_CTL_ADD, newfd, &e) == -1)  
  179.         {  
  180.             std::cout << "epoll_ctl error, fd =" << newfd << std::endl;  
  181.         }  
  182.     }  
  183.   
  184.     std::cout << "accept thread exit ..." << std::endl;  
  185. }  
  186.   
  187. void CMyReactor::worker_thread_proc(CMyReactor* pReatcor)  
  188. {  
  189.     std::cout << "new worker thread, thread id = " << std::this_thread::get_id() << std::endl;  
  190.   
  191.     while (true)  
  192.     {  
  193.         int clientfd;  
  194.         {  
  195.             std::unique_lock<std::mutex> guard(pReatcor->m_workermutex);  
  196.             while (pReatcor->m_listClients.empty())  
  197.             {  
  198.                 if (pReatcor->m_bStop)  
  199.                 {  
  200.                     std::cout << "worker thread exit ..." << std::endl;  
  201.                     return;  
  202.                 }  
  203.                       
  204.                 pReatcor->m_workercond.wait(guard);  
  205.             }  
  206.                   
  207.             clientfd = pReatcor->m_listClients.front();  
  208.             pReatcor->m_listClients.pop_front();  
  209.         }  
  210.   
  211.         //gdb调试时不能实时刷新标准输出,用这个函数刷新标准输出,使信息在屏幕上实时显示出来  
  212.         std::cout << std::endl;  
  213.   
  214.         std::string strclientmsg;  
  215.         char buff[256];  
  216.         bool bError = false;  
  217.         while (true)  
  218.         {  
  219.             memset(buff, 0, sizeof(buff));  
  220.             int nRecv = ::recv(clientfd, buff, 256, 0);  
  221.             if (nRecv == -1)  
  222.             {  
  223.                 if (errno == EWOULDBLOCK)  
  224.                     break;  
  225.                 else  
  226.                 {  
  227.                     std::cout << "recv error, client disconnected, fd = " << clientfd << std::endl;  
  228.                     pReatcor->close_client(clientfd);  
  229.                     bError = true;  
  230.                     break;  
  231.                 }  
  232.   
  233.             }  
  234.             //对端关闭了socket,这端也关闭。  
  235.             else if (nRecv == 0)  
  236.             {  
  237.                 std::cout << "peer closed, client disconnected, fd = " << clientfd << std::endl;  
  238.                 pReatcor->close_client(clientfd);  
  239.                 bError = true;  
  240.                 break;  
  241.             }  
  242.   
  243.             strclientmsg += buff;  
  244.         }  
  245.   
  246.         //出错了,就不要再继续往下执行了  
  247.         if (bError)  
  248.             continue;  
  249.   
  250.         std::cout << "client msg: " << strclientmsg;  
  251.   
  252.         //将消息加上时间标签后发回  
  253.         time_t now = time(NULL);  
  254.         struct tm* nowstr = localtime(&now);  
  255.         std::ostringstream ostimestr;  
  256.         ostimestr << "[" << nowstr->tm_year + 1900 << "-"  
  257.             << std::setw(2) << std::setfill('0') << nowstr->tm_mon + 1 << "-"  
  258.             << std::setw(2) << std::setfill('0') << nowstr->tm_mday << " "  
  259.             << std::setw(2) << std::setfill('0') << nowstr->tm_hour << ":"  
  260.             << std::setw(2) << std::setfill('0') << nowstr->tm_min << ":"  
  261.             << std::setw(2) << std::setfill('0') << nowstr->tm_sec << "]server reply: ";  
  262.   
  263.         strclientmsg.insert(0, ostimestr.str());  
  264.   
  265.         while (true)  
  266.         {  
  267.             int nSent = ::send(clientfd, strclientmsg.c_str(), strclientmsg.length(), 0);  
  268.             if (nSent == -1)  
  269.             {  
  270.                 if (errno == EWOULDBLOCK)  
  271.                 {  
  272.                     std::this_thread::sleep_for(std::chrono::milliseconds(10));  
  273.                     continue;  
  274.                 }  
  275.                 else  
  276.                 {  
  277.                     std::cout << "send error, fd = " << clientfd << std::endl;  
  278.                     pReatcor->close_client(clientfd);  
  279.                     break;  
  280.                 }  
  281.   
  282.             }  
  283.   
  284.             std::cout << "send: " << strclientmsg;  
  285.             strclientmsg.erase(0, nSent);  
  286.   
  287.             if (strclientmsg.empty())  
  288.                 break;  
  289.         }  
  290.     }  
  291. }  
  292.   
  293. bool CMyReactor::create_server_listener(const char* ip, short port)  
  294. {  
  295.     m_listenfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);  
  296.     if (m_listenfd == -1)  
  297.         return false;  
  298.   
  299.     int on = 1;  
  300.     ::setsockopt(m_listenfd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on));  
  301.     ::setsockopt(m_listenfd, SOL_SOCKET, SO_REUSEPORT, (char *)&on, sizeof(on));  
  302.   
  303.     struct sockaddr_in servaddr;  
  304.     memset(&servaddr, 0, sizeof(servaddr));  
  305.     servaddr.sin_family = AF_INET;  
  306.     servaddr.sin_addr.s_addr = inet_addr(ip);  
  307.     servaddr.sin_port = htons(port);  
  308.     if (::bind(m_listenfd, (sockaddr *)&servaddr, sizeof(servaddr)) == -1)  
  309.         return false;  
  310.   
  311.     if (::listen(m_listenfd, 50) == -1)  
  312.         return false;  
  313.   
  314.     m_epollfd = ::epoll_create(1);  
  315.     if (m_epollfd == -1)  
  316.         return false;  
  317.   
  318.     struct epoll_event e;  
  319.     memset(&e, 0, sizeof(e));  
  320.     e.events = EPOLLIN | EPOLLRDHUP;  
  321.     e.data.fd = m_listenfd;  
  322.     if (::epoll_ctl(m_epollfd, EPOLL_CTL_ADD, m_listenfd, &e) == -1)  
  323.         return false;  
  324.   
  325.     return true;  
  326. }  

main.cpp文件内容:

[cpp] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. /**  
  2.  *@desc:   用reactor模式练习服务器程序 
  3.  *@author: zhangyl 
  4.  *@date:   2016.12.03 
  5.  */  
  6.   
  7. #include <iostream>  
  8. #include <signal.h>     //for signal()  
  9. #include<unistd.h>  
  10. #include <stdlib.h>       //for exit()  
  11. #include <sys/types.h>  
  12. #include <sys/stat.h>  
  13. #include <fcntl.h>  
  14. #include "myreactor.h"  
  15.   
  16. CMyReactor g_reator;  
  17.   
  18. void prog_exit(int signo)  
  19. {  
  20.     std::cout << "program recv signal " << signo << " to exit." << std::endl;   
  21.   
  22.     g_reator.uninit();  
  23. }  
  24.   
  25. void daemon_run()  
  26. {  
  27.     int pid;  
  28.     signal(SIGCHLD, SIG_IGN);  
  29.     //1)在父进程中,fork返回新创建子进程的进程ID;  
  30.     //2)在子进程中,fork返回0;  
  31.     //3)如果出现错误,fork返回一个负值;  
  32.     pid = fork();  
  33.     if (pid < 0)  
  34.     {  
  35.         std:: cout << "fork error" << std::endl;  
  36.         exit(-1);  
  37.     }  
  38.     //父进程退出,子进程独立运行  
  39.     else if (pid > 0) {  
  40.         exit(0);  
  41.     }  
  42.     //之前parent和child运行在同一个session里,parent是会话(session)的领头进程,  
  43.     //parent进程作为会话的领头进程,如果exit结束执行的话,那么子进程会成为孤儿进程,并被init收养。  
  44.     //执行setsid()之后,child将重新获得一个新的会话(session)id。  
  45.     //这时parent退出之后,将不会影响到child了。  
  46.     setsid();  
  47.     int fd;  
  48.     fd = open("/dev/null", O_RDWR, 0);  
  49.     if (fd != -1)  
  50.     {  
  51.         dup2(fd, STDIN_FILENO);  
  52.         dup2(fd, STDOUT_FILENO);  
  53.         dup2(fd, STDERR_FILENO);  
  54.     }  
  55.     if (fd > 2)  
  56.         close(fd);  
  57. }  
  58.   
  59.   
  60. int main(int argc, char* argv[])  
  61. {    
  62.     //设置信号处理  
  63.     signal(SIGCHLD, SIG_DFL);  
  64.     signal(SIGPIPE, SIG_IGN);  
  65.     signal(SIGINT, prog_exit);  
  66.     signal(SIGKILL, prog_exit);  
  67.     signal(SIGTERM, prog_exit);  
  68.       
  69.     short port = 0;  
  70.     int ch;  
  71.     bool bdaemon = false;  
  72.     while ((ch = getopt(argc, argv, "p:d")) != -1)  
  73.     {  
  74.         switch (ch)  
  75.         {  
  76.         case 'd':  
  77.             bdaemon = true;  
  78.             break;  
  79.         case 'p':  
  80.             port = atol(optarg);  
  81.             break;  
  82.         }  
  83.     }  
  84.   
  85.     if (bdaemon)  
  86.         daemon_run();  
  87.   
  88.   
  89.     if (port == 0)  
  90.         port = 12345;  
  91.   
  92.       
  93.     if (!g_reator.init("0.0.0.0", 12345))  
  94.         return -1;  
  95.       
  96.     g_reator.main_loop(&g_reator);  
  97.   
  98.     return 0;  
  99. }  

完整实例代码下载地址:

普通版本:https://pan.baidu.com/s/1o82Mkno

C++11版本:https://pan.baidu.com/s/1dEJdrih


2011-05-13 17:29:00 qwidget 阅读数 5358

曾经维护维护过一个服务器项目有用到ACE,也是第一次从中学习到reactor和proactor两种并发模式,但仅限于用,而且linux上只用reactor,据说是linux系统本身对异步I/O支持程度等各种因素影响了proactor,关于windows上的ACE应用还得承认孤陋寡闻没见过,好像大家都宁愿用IOCP自己封装一套(本质跟proactor差不多了)。由于ACE源码可读性较差,项目没出现大问题也就懒得去细读其源码了。现在终于发现了一个可读性不错的网络库poco,作者面向对象的功底很深厚,值得细读。我手中的poco版本应该算新,是1.4.1p1。其中多路分派网络并发模型的底层实现分开两种情形,platform是支持epoll的较高版本linux的话,就用epoll,其他可支持的平台,如windows和低版本linux,用的都是select。

 

最后的精彩时刻又到了,关于SocketReactor的读后总结,上图:

 

reactor模式C++实现

阅读数 2860

没有更多推荐了,返回首页