ajax请求例子 react
2018-07-27 09:00:13 u013219087 阅读数 678
Reactor模式是把一个完整的IO操作分成几部分来进行,使用一个或多个Reactor来处理IO请求,对于具体的IO读写以及数据读取完成后的操作,分成更细粒度的模块,根据具体的需求和模块的耗时,来选择性使用单线程或是多线程或者线程池来完成整个IO操作。

来看具体的例子:
Reactor简单例子
Reactor简单例子

Reactor简单例子
Reactor实现了Runnable接口,为多线程做准备(实际应用中未必要使用多线程,很多时候使用单线程);
注册accpet事件;
设置通道为非阻塞。

Reactor简单例子
Reactor简单例子
这里的while (true)和单线程的IO多路复用一致,不同的是在dispatch分发这里,这里可以选择单线程继续执行,也可以使用多线程执行,根据IO操作的耗时来决定。

这里的key.attachment()第一次是在Reactor初始化时设置的:
Reactor简单例子
attach了一个Acceptor对象。

接下来来看Acceptor:
Reactor简单例子
Acceptor同样实现了Runnable接口,为多线程做准备,根据实际需要来决定是否使用多线程。
Reactor简单例子
在Acceptor里,对客户端连接的请求进行处理:
设置非阻塞模式;
注册读事件;
attach具体的处理handler。
selector.wakeup()针对多线程,唤醒selector.select()方法上阻塞的线程。
这里sk.attach(new Handler(sk, sc));后,会返回到Reactor的while(true)主循环里dispatch()方法进行处理:
Reactor简单例子

最后来看具体的处理Handler:
Reactor简单例子
Handler同样实现Runnable接口,为多线程做准备。
添加一个state变量,用来区分 读/写 操作。
Reactor简单例子
Reactor简单例子

Reactor简单例子
在Handler里进行具体的IO读写并且对 读/写 监听事件进行切换。



ajax请求例子 react 相关内容

2018-08-05 17:17:36 Guo_binglo 阅读数 166

程序的功能一个简单的echo服务:客户端连接上服务器之后,给服务器发送信息,服务器加上时间戳等信息后返回给客户端。

/**   
 *@desc:用reactor模式练习服务器程序,main.cpp
 *@author: zhangyl
 *@date:   2016.11.23
  */  
 #include <iostream>
 #include <string.h>
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <netinet/in.h>
 #include <arpa/inet.h>    //for htonl() and htons()
 #include <unistd.h>
 #include <fcntl.h>
 #include <sys/epoll.h>
 #include <signal.h>    //for signal()
 #include <pthread.h>
 #include <semaphore.h>
 #include <list>
 #include <errno.h>
 #include <time.h>
 #include <sstream>
 #include <iomanip>     //for std::setw()/setfill()
 #include <stdlib.h>  

#define WORKER_THREAD_NUM   5  
#define min(a, b) ((a <= b) ? (a) : (b))   
int g_epollfd = 0;
bool g_bStop = false;
int g_listenfd = 0;
pthread_t g_acceptthreadid = 0;
pthread_t g_threadid[WORKER_THREAD_NUM] = { 0 };  
pthread_cond_t g_acceptcond;
pthread_mutex_t g_acceptmutex;  
pthread_cond_t g_cond /*= PTHREAD_COND_INITIALIZER*/;  
pthread_mutex_t g_mutex /*= PTHREAD_MUTEX_INITIALIZER*/;  
pthread_mutex_t g_clientmutex;  
std::list<int> g_listClients;  
void prog_exit(int signo)
{  
  ::signal(SIGINT, SIG_IGN);  
  ::signal(SIGKILL, SIG_IGN);  
  ::signal(SIGTERM, SIG_IGN);  

  std::cout << "program recv signal " << signo
            << " to exit." << std::endl;  

  g_bStop = true;  

  ::epoll_ctl(g_epollfd, EPOLL_CTL_DEL, g_listenfd, NULL);  

  //TODO: 是否需要先调用shutdown()一下?  
  ::shutdown(g_listenfd, SHUT_RDWR);  
  ::close(g_listenfd);  
  ::close(g_epollfd);  

  ::pthread_cond_destroy(&g_acceptcond);  
  ::pthread_mutex_destroy(&g_acceptmutex);  

  ::pthread_cond_destroy(&g_cond);  
  ::pthread_mutex_destroy(&g_mutex);  

  ::pthread_mutex_destroy(&g_clientmutex);
}  
bool create_server_listener(const char* ip, short port)
{  //socket()用于创建一个socket描述符
  g_listenfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);  
  if (g_listenfd == -1)  
      return false;  

  int on = 1;  
  ::setsockopt(g_listenfd, SOL_SOCKET, SO_REUSEADDR,
               (char *)&on, sizeof(on));  
  ::setsockopt(g_listenfd, SOL_SOCKET, SO_REUSEPORT,
               (char *)&on, sizeof(on));  

    //初始化地址
  struct sockaddr_in servaddr;  
  memset(&servaddr, 0, sizeof(servaddr));   
  servaddr.sin_family = AF_INET;  
  servaddr.sin_addr.s_addr = inet_addr(ip);  
  servaddr.sin_port = htons(port);  
  // //bind()函数把一个地址族中的特定地址赋给socket 
  if (::bind(g_listenfd, (sockaddr *)&servaddr,sizeof(servaddr)) == -1)
      return false;  

//调用listen()来监听这个socket,如果客户端这时调用connect()发出连接请求,服务器端就会接收到这个请求
  if (::listen(g_listenfd, 50) == -1)  
      return false;  

  g_epollfd = ::epoll_create(1);  
  if (g_epollfd == -1)  
      return false;  

  struct epoll_event e;  
  memset(&e, 0, sizeof(e)); //将e中当前位置后面的e大小个字节 用0 替换并返回 e 。 
  e.events = EPOLLIN | EPOLLRDHUP;  
  e.data.fd = g_listenfd;     
  //监听g_listenfd是否发生
  if (::epoll_ctl(g_epollfd, EPOLL_CTL_ADD, g_listenfd, &e) == -1)  
      return false;  

  return true;
}  
void release_client(int clientfd)
{  
  if (::epoll_ctl(g_epollfd, EPOLL_CTL_DEL, clientfd, NULL) == -1)  
      std::cout << "release client socket failed as call epoll_ctl failed"
                << std::endl;  

  ::close(clientfd);
}  
void* accept_thread_func(void* arg)
{     
  while (!g_bStop)  
  {  
      ::pthread_mutex_lock(&g_acceptmutex);  //给互斥体变量加锁 
      ::pthread_cond_wait(&g_acceptcond, &g_acceptmutex);  //用来等待条件变量被设置 等待调用需要一个已经上锁的互斥体mutex,这是为了防止在真正进入等待状态之前别的线程有可能设置该条件变量而产生竞争。
      //::pthread_mutex_lock(&g_acceptmutex);  

      //std::cout << "run loop in accept_thread_func" << std::endl;  

/*TCP服务器端依次调用socket()、bind()、listen()之后,
就会监听指定的socket地址了。TCP客户端依次调用socket()、
connect()之后就向TCP服务器发送了一个连接请求。
TCP服务器监听到这个请求之后,就会调用accept()函数取接收请求,
这样连接就建立好了。*/
      struct sockaddr_in clientaddr;  
      socklen_t addrlen;  
      int newfd = ::accept(g_listenfd,
                           (struct sockaddr *)&clientaddr, &addrlen);  
      ::pthread_mutex_unlock(&g_acceptmutex);  //给互斥体变量解除锁
      if (newfd == -1)  
          continue;  

      std::cout << "new client connected: "
                << ::inet_ntoa(clientaddr.sin_addr) << ":" //将一个十进制网络字节序转换为点分十进制IP格式的字符串
                << ::ntohs(clientaddr.sin_port) << std::endl;  //将一个16位数由网络字节顺序转换为主机字节顺序

      //将新socket设置为non-blocking        设置非阻塞
      int oldflag = ::fcntl(newfd, F_GETFL, 0);  
      int newflag = oldflag | O_NONBLOCK;  
      if (::fcntl(newfd, F_SETFL, newflag) == -1)  
      {  
          std::cout << "fcntl error, oldflag =" << oldflag
                    << ", newflag = " << newflag << std::endl;  
          continue;  
      }  

      //监听连接进来的fd
      struct epoll_event e;  
      memset(&e, 0, sizeof(e));  
      e.events = EPOLLIN | EPOLLRDHUP | EPOLLET;  
      e.data.fd = newfd;  
      if (::epoll_ctl(g_epollfd, EPOLL_CTL_ADD, newfd, &e) == -1)  
      {  
          std::cout << "epoll_ctl error, fd =" << newfd << std::endl;  
      }  
  }  

  return NULL;
}  

void* worker_thread_func(void* arg)
{     
  while (!g_bStop)  
  {  
      int clientfd;  
      ::pthread_mutex_lock(&g_clientmutex);  //给互斥体变量加锁
      while (g_listClients.empty())  
          ::pthread_cond_wait(&g_cond, &g_clientmutex); //用来等待条件变量被设置 
      clientfd = g_listClients.front();  
      g_listClients.pop_front();    
      pthread_mutex_unlock(&g_clientmutex);  //给互斥体变量解除锁

      //gdb调试时不能实时刷新标准输出,用这个函数刷新标准输出,使信息在屏幕上实时显示出来  
      std::cout << std::endl;  

      std::string strclientmsg;  
      char buff[256];  
      bool bError = false;  
      while (true)  
      {  
          memset(buff, 0, sizeof(buff));  
          int nRecv = ::recv(clientfd, buff, 256, 0);  //接收数据
          if (nRecv == -1)  
          {  
              if (errno == EWOULDBLOCK)  
                  break;  
              else  
              {  
                  std::cout << "recv error, client disconnected, fd = "
                            << clientfd << std::endl;  
                  release_client(clientfd);  
                  bError = true;  
                  break;  
              }  

          }  
          //对端关闭了socket,这端也关闭。  
          else if (nRecv == 0)  
          {  
              std::cout << "peer closed, client disconnected, fd = "
                        << clientfd << std::endl;  
              release_client(clientfd); // 将clientfd从epoll_fd删除掉
              bError = true;  
              break;  
          }  

          strclientmsg += buff;  //将数据加到strclientmsg
      }  

      //出错了,就不要再继续往下执行了  
      if (bError)  
          continue;  

      std::cout << "client msg: " << strclientmsg;  

      //将消息加上时间标签后发回  
      time_t now = time(NULL);  
      struct tm* nowstr = localtime(&now);  
      std::ostringstream ostimestr;  
      ostimestr << "[" << nowstr->tm_year + 1900 << "-"   
                << std::setw(2) << std::setfill('0')
                << nowstr->tm_mon + 1 << "-"   
                << std::setw(2) << std::setfill('0')
                << nowstr->tm_mday << " "  
                << std::setw(2) << std::setfill('0')
                << nowstr->tm_hour << ":"   
                << std::setw(2) << std::setfill('0')
                << nowstr->tm_min << ":"   
                << std::setw(2) << std::setfill('0')
                << nowstr->tm_sec << "]server reply: ";  

      strclientmsg.insert(0, ostimestr.str());  

      while (true)  
      {  
          int nSent = ::send(clientfd, strclientmsg.c_str(), //发送数据
                             strclientmsg.length(), 0);  
          if (nSent == -1)  
          {  
              if (errno == EWOULDBLOCK)  
              {  
                  ::sleep(10);  
                  continue;  
              }  
              else  
              {  
                  std::cout << "send error, fd = "
                            << clientfd << std::endl;  
                  release_client(clientfd);  
                  break;  
              }  

          }            

          std::cout << "send: " << strclientmsg;  
          strclientmsg.erase(0, nSent);  //删除从0开始的nSent大小个字符

          if (strclientmsg.empty())  
              break;  
      }  
  }  

  return NULL;
}  
void daemon_run()
{  
  int pid;  
  signal(SIGCHLD, SIG_IGN);  //注册信号,屏蔽SIGCHLD信号,子进程退出,将不会给父进程发送信号,因此也不会出现僵尸进程
  //1)在父进程中,fork返回新创建子进程的进程ID;  
  //2)在子进程中,fork返回0;  
  //3)如果出现错误,fork返回一个负值;  
  pid = fork();  
  if (pid < 0)  
  {  
      std:: cout << "fork error" << std::endl;  
      exit(-1);  
  }  
  //父进程退出,子进程独立运行  
  else if (pid > 0) {  
      exit(0);  
  }  
  //之前parent和child运行在同一个session里,parent是会话(session)的领头进程,  
  //parent进程作为会话的领头进程,如果exit结束执行的话,那么子进程会成为孤儿进程,并被init收养。  
  //执行setsid()之后,child将重新获得一个新的会话(session)id。  
  //这时parent退出之后,将不会影响到child了。  
  setsid();  
  int fd;  
  fd = open("/dev/null", O_RDWR, 0);  //写入/dev/null的东西会被系统丢掉 对stdin/stdout/stderr进行保护
  if (fd != -1)  
  {                                //dup2()使一个文件描述符等效于另外一个文件描述符
      dup2(fd, STDIN_FILENO);  //STDIN_FILENO代表指向标准输入的文件描述符,它的值为0,这个函数会关闭标准输入。
      dup2(fd, STDOUT_FILENO);  ////把输出重定向到fd标识的文件
      dup2(fd, STDERR_FILENO);  
  }  
  if (fd > 2)  
      close(fd);  
 }  

int main(int argc, char* argv[])
{    
  short port = 0;  
  int ch;  
  bool bdaemon = false;  
  /*
    while((ch = getopt(argc,argv,"a:bcde"))!= -1)
  {
  switch(ch)
  {
  case : printf("xxxtest");
  case 'a': printf("option a:’%s’\n",optarg); break;
  case 'b': printf("option b :b\n"); break;
  default: printf("other option :%c\n",ch);
  }
  printf("optopt +%c\n",optopt);
  }
  return 0;
  }

    执行 $./getopt –b
    option b:b
    执行 $./getopt –c
    other option:c
    执行 $./getopt –a
    other option :?
    执行 $./getopt –a12345
    option a:’12345’
  */
  while ((ch = getopt(argc, argv, "p:d")) != -1)  //分析命令行参数。参数argc和argv分别代表参数个数和内容和选项字符串
  {  
      switch (ch)  
      {  
      case 'd':  
          bdaemon = true;  
          break;  
      case 'p':  
          port = atol(optarg); //把字符串转换成长整型数
          break;  
      }  
  }  

  if (bdaemon)  
      daemon_run();  


  if (port == 0)  
      port = 12345;  

  if (!create_server_listener("0.0.0.0", port)) //创建监听 
  {  
      std::cout << "Unable to create listen server: ip=0.0.0.0, port="
                << port << "." << std::endl;  
      return -1;  
  }  


  //设置信号处理  
  signal(SIGCHLD, SIG_DFL);  
  signal(SIGPIPE, SIG_IGN);  
  signal(SIGINT, prog_exit);  
  signal(SIGKILL, prog_exit);  
  signal(SIGTERM, prog_exit);  

  //这是要做线程同步
  ::pthread_cond_init(&g_acceptcond, NULL);  //创建一个条件变量
  ::pthread_mutex_init(&g_acceptmutex, NULL);  //按缺省的属性初始化互斥体变量mutex 

  ::pthread_cond_init(&g_cond, NULL);  
  ::pthread_mutex_init(&g_mutex, NULL);  

  ::pthread_mutex_init(&g_clientmutex, NULL);  

//创建工作线程执行accept_thread_func函数,  accept_thread_func有新客户端连接进来,监听客户端fd
  ::pthread_create(&g_acceptthreadid, NULL, accept_thread_func, NULL);  
  //启动工作线程  执行worker_thread_func,接收数据以及发送数据
  for (int i = 0; i < WORKER_THREAD_NUM; ++i)  
  {  
      ::pthread_create(&g_threadid[i], NULL, worker_thread_func, NULL);  
  }  

  while (!g_bStop)  
  {         
      struct epoll_event ev[1024];    //这里监听的是 是否有新连接进来 和 客户端的socket上是否有可读事件
      int n = ::epoll_wait(g_epollfd, ev, 1024, 10);  
      if (n == 0)  
          continue;  
      else if (n < 0)  
      {  
          std::cout << "epoll_wait error" << std::endl;  
          continue;  
      }  

      int m = min(n, 1024);  
      for (int i = 0; i < m; ++i)  
      {  
          //通知接收连接线程接收新连接  
          if (ev[i].data.fd == g_listenfd)  
              pthread_cond_signal(&g_acceptcond);  
          //通知普通工作线程接收数据  
          else  
          {                 
              pthread_mutex_lock(&g_clientmutex);                
              g_listClients.push_back(ev[i].data.fd);  
              pthread_mutex_unlock(&g_clientmutex);  
              pthread_cond_signal(&g_cond);  
              //std::cout << "signal" << std::endl;  
          }  

      }  

  }  

  return 0;
}

程序的大致框架是:

主线程只负责监听侦听socket上是否有新连接,如果有新连接到来,交给一个叫accept的工作线程去接收新连接,并将新连接socket绑定到主线程使用epollfd上去。

主线程如果侦听到客户端的socket上有可读事件,则通知另外五个工作线程去接收处理客户端发来的数据,并将数据加上时间戳后发回给客户端。

可以通过传递-p port来设置程序的监听端口号;可以通过传递-d来使程序以daemon模式运行在后台。这也是标准linux daemon模式的书写方法。

程序难点和需要注意的地方是:

1、条件变量为了防止虚假唤醒,一定要在一个循环里面调用pthread_cond_wait()函数,我在worker_thread_func()中使
while (g_listClients.empty())
::pthread_cond_wait(&g_cond, &g_clientmutex);
cept_thread_func()函数里面我没有使用循环,这样会有问题吗?

2、使用条件变量pthread_cond_wait()函数的时候一定要先获得与该条件变量相关的mutex,即像下面这样的结构:

1  mutex_lock(...);  
2  while (condition is true)  
3    ::pthread_cond_wait(...);  
4  //这里可以有其他代码...  mutex_unlock(...);  
5  //这里可以有其他代码...

因为pthread_cond_wait()如果阻塞的话,它解锁相关mutex和阻塞当前线程这两个动作加在一起是原子的。

3、作为服务器端程序最好对侦听socket调用setsocketopt()设置SO_REUSEADDR和SO_REUSEPORT两个标志,因为服务程序有时候会需要重启(比如调试的时候就会不断重启),如果不设置这两个标志的话,绑定端口时就会调用失败。因为一个端口使用后,即使不再使用,因为四次挥手该端口处于TIME_WAIT状态,有大约2min的MSL(Maximum Segment Lifetime,最大存活期)。这2min内,该端口是不能被重复使用的。你的服务器程序上次使用了这个端口号,接着重启,因为这个缘故,你再次绑定这个端口就会失败(bind函数调用失败)。要不你就每次重启时需要等待2min后再试(这在频繁重启程序调试是难以接收的),或者设置这种SO_REUSEADDR和SO_REUSEPORT立即回收端口使用。
其实,SO_REUSEADDR在windows上和Unix平台上还有些细微的区别,我在libevent源码中看到这样的描述:

 1int evutil_make_listen_socket_reuseable(evutil_socket_t sock)  
 2{  
 3 #ifndef WIN32  
 4    int one = 1;  
 5    /* REUSEADDR on Unix means, "don't hang on to this address after the 
 6     * listener is closed."  On Windows, though, it means "don't keep other 
 7     * processes from binding to this address while we're using it.
 8     */  
 9    return setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void*) &one,  
10        (ev_socklen_t)sizeof(one));  
11#else  
12    return 0;  
13#endif  
14}  

注意注释部分,在Unix平台上设置这个选项意味着,任意进程可以复用该地址;而在windows,不要阻止其他进程复用该地址。也就是在在Unix平台上,如果不设置这个选项,任意进程在一定时间内,不能bind该地址;在windows平台上,在一定时间内,其他进程不能bind该地址,而本进程却可以再次bind该地址。

4、epoll_wait对新连接socket使用的是边缘触发模式EPOLLET(edge trigger),而不是默认的水平触发模式(level trigger)。因为如果采取水平触发模式的话,主线程检测到某个客户端socket数据可读时,通知工作线程去收取该socket上的数据,这个时候主线程继续循环,只要在工作线程没有将该socket上数据全部收完,或者在工作线程收取数据的过程中,客户端有新数据到来,主线程会继续发通知(通过pthread_cond_signal())函数,再次通知工作线程收取数据。这样会可能导致多个工作线程同时调用recv函数收取该客户端socket上的数据,这样产生的结果将会导致数据错乱。
相反,采取边缘触发模式,只有等某个工作线程将那个客户端socket上数据全部收取完毕,主线程的epoll_wait才可能会再次触发来通知工作线程继续收取那个客户端socket新来的数据。

5、代码中有这样一行:
//gdb调试时不能实时刷新标准输出,用这个函数刷新标准输出,使信息在屏幕上实时显示出来 std::cout << std::endl;
如果不加上这一行,正常运行服务器程序,程序中要打印到控制台的信息都会打印出来,但是如果用gdb调试状态下,程序的所有输出就不显示了。我不知道这是不是gdb的一个bug,所以这里加上std::endl来输出一个换行符并flush标准输出,让输出显示出来。(std::endl不仅是输出一个换行符而且是同时刷新输出,相当于fflush()函数)。

程序我部署起来了,你可以使用linux的nc命令或自己写程序连接服务器来查看程序效果,当然也可以使用telnet命令,方法:

linux:

nc 120.55.94.78 12345

telnet 120.55.94.78 12345

然后就可以给服务器自由发送数据了,服务器会给你发送的信息加上时间戳返回给你。效果如图:

另外我将这个代码改写了成纯C++11版本,使用CMake编译,为了支持编译必须加上这-std=c++11:

CMakeLists.txt代码如下:

 1cmake_minimum_required(VERSION 2.8)  
 2  PROJECT(myreactorserver)  
 3  AUX_SOURCE_DIRECTORY(./ SRC_LIST)
 4  SET(EXECUTABLE_OUTPUT_PATH ./)  
 5  ADD_DEFINITIONS(-g -W -Wall -Wno-deprecated
 6                  -DLINUX -D_REENTRANT -D_FILE_OFFSET_BITS=64
 7                  -DAC_HAS_INFO -DAC_HAS_WARNING -DAC_HAS_ERROR 
 8                  -DAC_HAS_CRITICAL -DTIXML_USE_STL
 9                  -DHAVE_CXX_STDHEADERS ${CMAKE_CXX_FLAGS}
10                  -std=c++11)  
11  INCLUDE_DIRECTORIES(  ./  )
12  LINK_DIRECTORIES(  ./  )  
13  set(  main.cpp  myreator.cpp  )  
14  ADD_EXECUTABLE(myreactorserver ${SRC_LIST})  
15  TARGET_LINK_LIBRARIES(myreactorserver pthread)  

myreactor.h文件内容:

 1/**
 2 *@desc: myreactor头文件, myreactor.h
 3 *@author: zhangyl
 4 *@date: 2016.12.03
 5 */
 6  #ifndef __MYREACTOR_H__
 7  #define __MYREACTOR_H__  
 8  #include <list>
 9  #include <memory>
10  #include <thread>
11  #include <mutex>
12  #include <condition_variable>  
13  #define WORKER_THREAD_NUM   5  
14  class CMyReactor
15  {
16  public:  
17    CMyReactor();  
18    ~CMyReactor();  
19
20    bool init(const char* ip, short nport);  
21    bool uninit();  
22
23    bool close_client(int clientfd);  
24
25    static void* main_loop(void* p);  
26  private:  
27    //no copyable  
28    CMyReactor(const CMyReactor& rhs);  
29    CMyReactor& operator = (const CMyReactor& rhs);  
30
31    bool create_server_listener(const char* ip, short port);  
32
33    static void accept_thread_proc(CMyReactor* pReatcor);  
34    static void worker_thread_proc(CMyReactor* pReatcor);  
35  private:  
36    //C11语法可以在这里初始化  
37    int                          m_listenfd = 0;  
38    int                          m_epollfd  = 0;  
39    bool                         m_bStop    = false;  
40
41    std::shared_ptr<std::thread> m_acceptthread;  
42    std::shared_ptr<std::thread> m_workerthreads[WORKER_THREAD_NUM];  
43
44    std::condition_variable      m_acceptcond;  
45    std::mutex                   m_acceptmutex;  
46
47    std::condition_variable      m_workercond ;  
48    std::mutex                   m_workermutex;  
49
50    std::list<int>                 m_listClients;
51  };  
52  #endif //!__MYREACTOR_H__ 

myreactor.cpp文件内容:

  1 /**
  2  *@desc: myreactor实现文件, myreactor.cpp
  3  *@author: zhangyl
  4  *@date: 2016.12.03
  5  */  #include "myreactor.h"
  6  #include <iostream>
  7  #include <string.h>
  8  #include <sys/types.h>
  9  #include <sys/socket.h>
 10  #include <netinet/in.h>
 11  #include <arpa/inet.h>  //for htonl() and htons()
 12  #include <fcntl.h>
 13  #include <sys/epoll.h>
 14  #include <list>
 15  #include <errno.h>
 16  #include <time.h>
 17  #include <sstream>
 18  #include <iomanip>   //for std::setw()/setfill()
 19  #include <unistd.h>  
 20  #define min(a, b) ((a <= b) ? (a) : (b))  
 21  CMyReactor::CMyReactor()
 22  {  
 23    //m_listenfd = 0;  
 24    //m_epollfd = 0;  
 25    //m_bStop = false;
 26  }  
 27  CMyReactor::~CMyReactor()
 28  {  
 29  }  
 30  bool CMyReactor::init(const char* ip, short nport)
 31  {  
 32    if (!create_server_listener(ip, nport))  
 33    {  
 34        std::cout << "Unable to bind: " << ip
 35                  << ":" << nport << "." << std::endl;  
 36        return false;  
 37    }  
 38
 39
 40    std::cout << "main thread id = " << std::this_thread::get_id()
 41              << std::endl;  
 42
 43    //启动接收新连接的线程  
 44    m_acceptthread.reset(new std::thread(CMyReactor::accept_thread_proc, this));  
 45
 46    //启动工作线程  
 47    for (auto& t : m_workerthreads)  
 48    {  
 49        t.reset(new std::thread(CMyReactor::worker_thread_proc, this));  
 50    }  
 51
 52
 53    return true;
 54  }  
 55  bool CMyReactor::uninit()
 56  {  
 57    m_bStop = true;  
 58    m_acceptcond.notify_one();  
 59    m_workercond.notify_all();  
 60
 61    m_acceptthread->join();  
 62    for (auto& t : m_workerthreads)  
 63    {  
 64        t->join();  
 65    }  
 66
 67    ::epoll_ctl(m_epollfd, EPOLL_CTL_DEL, m_listenfd, NULL);  
 68
 69    //TODO: 是否需要先调用shutdown()一下?  
 70    ::shutdown(m_listenfd, SHUT_RDWR);  
 71    ::close(m_listenfd);  
 72    ::close(m_epollfd);  
 73
 74    return true;
 75  }  
 76  bool CMyReactor::close_client(int clientfd)
 77  {  
 78    if (::epoll_ctl(m_epollfd, EPOLL_CTL_DEL, clientfd, NULL) == -1)  
 79    {  
 80        std::cout << "close client socket failed as call epoll_ctl failed"
 81                  << std::endl;  
 82        //return false;  
 83    }  
 84
 85
 86    ::close(clientfd);  
 87
 88    return true;
 89  }  
 90
 91  void* CMyReactor::main_loop(void* p)
 92  {  
 93    std::cout << "main thread id = "
 94              << std::this_thread::get_id() << std::endl;  
 95
 96    CMyReactor* pReatcor = static_cast<CMyReactor*>(p);  
 97
 98    while (!pReatcor->m_bStop)  
 99    {  
100        struct epoll_event ev[1024];  
101        int n = ::epoll_wait(pReatcor->m_epollfd, ev, 1024, 10);  
102        if (n == 0)  
103            continue;  
104        else if (n < 0)  
105        {  
106            std::cout << "epoll_wait error" << std::endl;  
107            continue;  
108        }  
109
110        int m = min(n, 1024);  
111        for (int i = 0; i < m; ++i)  
112        {  
113            //通知接收连接线程接收新连接  
114            if (ev[i].data.fd == pReatcor->m_listenfd)  
115                pReatcor->m_acceptcond.notify_one();  
116            //通知普通工作线程接收数据  
117            else  
118            {  
119                {  
120                    std::unique_lock<std::mutex> guard(pReatcor->m_workermutex);  
121                    pReatcor->m_listClients.push_back(ev[i].data.fd);  
122                }  
123
124                pReatcor->m_workercond.notify_one();  
125                //std::cout << "signal" << std::endl;  
126            }// end if  
127
128        }// end for-loop  
129    }// end while  
130
131    std::cout << "main loop exit ..." << std::endl;  
132
133    return NULL;
134  }  
135  void CMyReactor::accept_thread_proc(CMyReactor* pReatcor)
136  {  
137    std::cout << "accept thread, thread id = "
138              << std::this_thread::get_id() << std::endl;  
139
140    while (true)  
141    {  
142        int newfd;  
143        struct sockaddr_in clientaddr;  
144        socklen_t addrlen;  
145        {  
146            std::unique_lock<std::mutex> guard(pReatcor->m_acceptmutex);  
147            pReatcor->m_acceptcond.wait(guard);  
148            if (pReatcor->m_bStop)  
149                break;  
150
151            //std::cout << "run loop in accept_thread_proc" << std::endl;  
152
153            newfd = ::accept(pReatcor->m_listenfd,
154                              (struct sockaddr *)&clientaddr, &addrlen);  
155        }  
156        if (newfd == -1)  
157            continue;  
158
159        std::cout << "new client connected: "
160                  << ::inet_ntoa(clientaddr.sin_addr) << ":"      
161                  << ::ntohs(clientaddr.sin_port) << std::endl;  
162
163        //将新socket设置为non-blocking  
164        int oldflag = ::fcntl(newfd, F_GETFL, 0);  
165        int newflag = oldflag | O_NONBLOCK;  
166        if (::fcntl(newfd, F_SETFL, newflag) == -1)  
167        {  
168            std::cout << "fcntl error, oldflag =" << oldflag
169                      << ", newflag = " << newflag << std::endl;  
170            continue;  
171        }  
172
173        struct epoll_event e;  
174        memset(&e, 0, sizeof(e));  
175        e.events = EPOLLIN | EPOLLRDHUP | EPOLLET;  
176        e.data.fd = newfd;  
177        if (::epoll_ctl(pReatcor->m_epollfd, 
178            EPOLL_CTL_ADD, newfd, &e) == -1)  
179        {  
180            std::cout << "epoll_ctl error, fd =" << newfd << std::endl;  
181        }  
182    }  
183
184    std::cout << "accept thread exit ..." << std::endl;
185  }  
186  void CMyReactor::worker_thread_proc(CMyReactor* pReatcor)
187  {  
188    std::cout << "new worker thread, thread id = "
189              << std::this_thread::get_id() << std::endl;  
190
191    while (true)  
192    {  
193        int clientfd;  
194        {  
195            std::unique_lock<std::mutex> guard(pReatcor->m_workermutex);  
196            while (pReatcor->m_listClients.empty())  
197            {  
198                if (pReatcor->m_bStop)  
199                {  
200                    std::cout << "worker thread exit ..." << std::endl;  
201                    return;  
202                }  
203
204                pReatcor->m_workercond.wait(guard);  
205            }  
206
207            clientfd = pReatcor->m_listClients.front();  
208            pReatcor->m_listClients.pop_front();  
209        }  
210
211        //gdb调试时不能实时刷新标准输出,用这个函数刷新标准输出,使信息在屏幕上实时显示出来  
212        std::cout << std::endl;  
213
214        std::string strclientmsg;  
215        char buff[256];  
216        bool bError = false;  
217        while (true)  
218        {  
219            memset(buff, 0, sizeof(buff));  
220            int nRecv = ::recv(clientfd, buff, 256, 0);  
221            if (nRecv == -1)  
222            {  
223                if (errno == EWOULDBLOCK)  
224                    break;  
225                else  
226                {  
227                    std::cout << "recv error, client disconnected, fd = "
228                              << clientfd << std::endl;  
229                    pReatcor->close_client(clientfd);  
230                    bError = true;  
231                    break;  
232                }  
233
234            }  
235            //对端关闭了socket,这端也关闭。  
236            else if (nRecv == 0)  
237            {  
238                std::cout << "peer closed, client disconnected, fd = "
239                          << clientfd << std::endl;  
240                pReatcor->close_client(clientfd);  
241                bError = true;  
242                break;  
243            }  
244
245            strclientmsg += buff;  
246        }  
247
248        //出错了,就不要再继续往下执行了  
249        if (bError)  
250            continue;  
251
252        std::cout << "client msg: " << strclientmsg;  
253
254        //将消息加上时间标签后发回  
255        time_t now = time(NULL);  
256        struct tm* nowstr = localtime(&now);  
257        std::ostringstream ostimestr;  
258        ostimestr << "[" << nowstr->tm_year + 1900 << "-"  
259            << std::setw(2) << std::setfill('0') << nowstr->tm_mon + 1 << "-"  
260            << std::setw(2) << std::setfill('0') << nowstr->tm_mday << " "  
261            << std::setw(2) << std::setfill('0') << nowstr->tm_hour << ":"  
262            << std::setw(2) << std::setfill('0') << nowstr->tm_min << ":"  
263            << std::setw(2) << std::setfill('0') << nowstr->tm_sec << "]server reply: ";  
264
265        strclientmsg.insert(0, ostimestr.str());  
266
267        while (true)  
268        {  
269            int nSent = ::send(clientfd, strclientmsg.c_str(), 
270                               strclientmsg.length(), 0);  
271            if (nSent == -1)  
272            {  
273                if (errno == EWOULDBLOCK)  
274                {  
275                    std::this_thread::sleep_for(std::chrono::milliseconds(10));  
276                    continue;  
277                }  
278                else  
279                {  
280                    std::cout << "send error, fd = "
281                              << clientfd << std::endl;  
282                    pReatcor->close_client(clientfd);  
283                    break;  
284                }  
285
286            }  
287
288            std::cout << "send: " << strclientmsg;  
289            strclientmsg.erase(0, nSent);  
290
291            if (strclientmsg.empty())  
292                break;  
293        }  
294    }
295  }  
296  bool CMyReactor::create_server_listener(const char* ip, short port)
297  {  
298    m_listenfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);  
299    if (m_listenfd == -1)  
300        return false;  
301
302    int on = 1;  
303    ::setsockopt(m_listenfd, SOL_SOCKET, SO_REUSEADDR,
304                (char *)&on, sizeof(on));  
305    ::setsockopt(m_listenfd, SOL_SOCKET, SO_REUSEPORT,
306                (char *)&on, sizeof(on));  
307
308    struct sockaddr_in servaddr;  
309    memset(&servaddr, 0, sizeof(servaddr));  
310    servaddr.sin_family = AF_INET;  
311    servaddr.sin_addr.s_addr = inet_addr(ip);  
312    servaddr.sin_port = htons(port);  
313    if (::bind(m_listenfd, (sockaddr *)&servaddr, 
314         sizeof(servaddr)) == -1)  
315        return false;  
316
317    if (::listen(m_listenfd, 50) == -1)  
318        return false;  
319
320    m_epollfd = ::epoll_create(1);  
321    if (m_epollfd == -1)  
322        return false;  
323
324    struct epoll_event e;  
325    memset(&e, 0, sizeof(e));  
326    e.events = EPOLLIN | EPOLLRDHUP;  
327    e.data.fd = m_listenfd;  
328    if (::epoll_ctl(m_epollfd, EPOLL_CTL_ADD, m_listenfd, &e) == -1)  
329        return false;  
330
331    return true;
332  }  

main.cpp文件内容:

  1/**  
  2 *@desc:   用reactor模式练习服务器程序 
  3 *@author: zhangyl 
  4 *@date:   2016.12.03 
  5 */  
  6
  7#include <iostream>  
  8#include <signal.h>     //for signal()  
  9#include<unistd.h>  
 10#include <stdlib.h>       //for exit()  
 11#include <sys/types.h>  
 12#include <sys/stat.h>  
 13#include <fcntl.h>  
 14#include "myreactor.h"  
 15
 16CMyReactor g_reator;  
 17
 18void prog_exit(int signo)  
 19{  
 20    std::cout << "program recv signal " << signo
 21              << " to exit." << std::endl;   
 22
 23    g_reator.uninit();  
 24}  
 25
 26void daemon_run()  
 27{  
 28    int pid;  
 29    signal(SIGCHLD, SIG_IGN);  
 30    //1)在父进程中,fork返回新创建子进程的进程ID;  
 31    //2)在子进程中,fork返回0;  
 32    //3)如果出现错误,fork返回一个负值;  
 33    pid = fork();  
 34    if (pid < 0)  
 35    {  
 36        std:: cout << "fork error" << std::endl;  
 37        exit(-1);  
 38    }  
 39    //父进程退出,子进程独立运行  
 40    else if (pid > 0)
 41   {  
 42        exit(0);  
 43    }  
 44    //之前parent和child运行在同一个session里,parent是会话(session)的领头进程,  
 45    //parent进程作为会话的领头进程,如果exit结束执行的话,那么子进程会成为孤儿进程,并被init收养。  
 46    //执行setsid()之后,child将重新获得一个新的会话(session)id。  
 47    //这时parent退出之后,将不会影响到child了。  
 48    setsid();  
 49    int fd;  
 50    fd = open("/dev/null", O_RDWR, 0);  
 51    if (fd != -1)  
 52    {  
 53        dup2(fd, STDIN_FILENO);  
 54        dup2(fd, STDOUT_FILENO);  
 55        dup2(fd, STDERR_FILENO);  
 56    }  
 57    if (fd > 2)  
 58        close(fd);  
 59}  
 60
 61
 62int main(int argc, char* argv[])  
 63{    
 64    //设置信号处理  
 65    signal(SIGCHLD, SIG_DFL);  
 66    signal(SIGPIPE, SIG_IGN);  
 67    signal(SIGINT, prog_exit);  
 68    signal(SIGKILL, prog_exit);  
 69    signal(SIGTERM, prog_exit);  
 70
 71    short port = 0;  
 72    int ch;  
 73    bool bdaemon = false;  
 74    while ((ch = getopt(argc, argv, "p:d")) != -1)  
 75    {  
 76        switch (ch)  
 77        {  
 78        case 'd':  
 79            bdaemon = true;  
 80            break;  
 81        case 'p':  
 82            port = atol(optarg);  
 83            break;  
 84        }  
 85    }  
 86
 87    if (bdaemon)  
 88        daemon_run();  
 89
 90
 91    if (port == 0)  
 92        port = 12345;  
 93
 94
 95    if (!g_reator.init("0.0.0.0", 12345))  
 96        return -1;  
 97
 98    g_reator.main_loop(&g_reator);  
 99
100    return 0;  
101}  

ajax请求例子 react 相关内容

2008-05-06 15:15:00 superyao2008 阅读数 1648

reactor 模式是ACE当中比较简单的一个,不初学的还是有个地方容易搞错以致程序无法运行,这是我写过跳过的一个简单的用reactor模式做客户端的练习。高手看到不足之处还请赐教。

HandlerClient.h

 

#ifndef HANDLER_CLIENT_H
#define HANDLER_CLIENT_H
#include 
<string>
#include 
"ace/Reactor.h"
#include 
"ace/SOCK_Connector.h"
#define REV_BUFSIZE 1024

class HandlerClient: public ACE_Event_Handler{
public:
    
int open();

    ACE_HANDLE get_handle() 
const;

    
int handle_input(ACE_HANDLE fd=ACE_INVALID_HANDLE);

    
int handle_output(ACE_HANDLE fd=ACE_INVALID_HANDLE);

    
int handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask);

public:
    std::
string _addr;
    unsigned 
short _port;

private:
    ACE_SOCK_Stream peer;
    
char revbuf[REV_BUFSIZE];

};
#endif

 

 

HandlerClient.cpp

 

#include "HandlerClient.h"

int HandlerClient::open(){
    ACE_SOCK_Connector connector;
    ACE_INET_Addr addr(_port,_addr.c_str());
    ACE_Time_Value timeout(
5,0);
    
if(connector.connect(peer,addr,&timeout) != 0)
        {
            ACE_ERROR_RETURN( (LM_ERROR,ACE_LIB_TEXT(
"error connect server %s:%d "),_addr.c_str(),_port),-1 );
        }
    
else{
        ACE_DEBUG( (LM_DEBUG,ACE_LIB_TEXT(
"connect server %s:%d done "),_addr.c_str(),_port) );
    }
    
return reactor()->register_handler(this,ACE_Event_Handler::READ_MASK|ACE_Event_Handler::WRITE_MASK);
}

ACE_HANDLE HandlerClient::get_handle() 
const{
    
return peer.get_handle();
}

int HandlerClient::handle_output(ACE_HANDLE fd ){
    size_t r
=peer.send("hello from client.",64);
    ACE_DEBUG( (LM_DEBUG,ACE_LIB_TEXT(
"send %d bytes to server "),r) );
    
return 0;
}

int HandlerClient::handle_input(ACE_HANDLE fd ){
    size_t r
=peer.recv(revbuf,REV_BUFSIZE);
    ACE_DEBUG( (LM_DEBUG,ACE_LIB_TEXT(
"receive from server: %s "),revbuf) );
    
return 0;

}

int HandlerClient::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask){
    
if (peer.get_handle()!=INVALID_HANDLE_VALUE)
    {
        reactor()
->remove_handler(this,ACE_Event_Handler::ALL_EVENTS_MASK);
        peer.close();
    }
    
return 0;
}

 

 

main.cpp

 

#include "HandlerClient.h"

int ACE_TMAIN(int arg_num, ACE_TCHAR *[]){
    HandlerClient c;
    c._addr
="localhost";
    c._port
=7144;
    c.reactor(ACE_Reactor::instance());
    c.open();
    ACE_Reactor::instance()
->run_reactor_event_loop();
    
return 0;
}

 

 

ajax请求例子 react 相关内容

2012-08-14 09:38:41 lzy0168 阅读数 1900

此文版权属于作者所有,任何人、媒体或者网站转载、借用都必须征得作者本人同意!

ACE 使用方法及例子,网上有不少,下面贴一段我写的采用 ACE Reactor 模式写的 echo 服务的例子代码,通过例子可以看出,采用 ACE 开发多客户端的服务程序那是相当简单的!

代码中,handle_input()和 handle_output()都会对 _bufs 进行操作,因为这两个函数都是运行在 reactor 的线程里,不会冲突,所以没有必要对 _bufs 的操作进行锁操作。

/* $Id: cpp.tpl 3412 2009-11-14 14:23:44Z luozhiyong $ */
/**
* \file ACEReactorSvrSample.cpp
*
* \brief 采用ACE Reactor 实现服务程序例子
*
* \version $Rev: 3412 $
* \author  
* \date     2009年09月08日08:17:10
*
* \note 修改历史:<br>
* <table>
*     <tr><th>日期</th><th>修改人</th><th>内容</th></tr>
*     <tr><td>2009-9-8</td><td></td><td>创建初稿</td>
*     </tr>
* </table>
*/
#include <ace/Message_Block.h>
#include <ace/Svc_Handler.h>
#include <ace/SOCK_Acceptor.h>
#include <ace/Acceptor.h>
#include <ace/Select_Reactor.h>
#include <list>
#include <string>
 
#ifdef _DEBUG
#    define ACE_RT_OPT "d"
#else
#    define ACE_RT_OPT
#endif
 
#if defined_DLL
#    define ACE_LIB_THREAD_OPT
#else
#    define ACE_LIB_THREAD_OPT "s"
#endif
 
#pragma comment(lib, "ACE"ACE_LIB_THREAD_OPT ACE_RT_OPT ".lib")
 
class EchoService
     : public ACE_Event_Handler
{
public:
     typedef ACE_SOCK_STREAM stream_type;
     typedef EchoService my_type;
     typedef ACE_Acceptor<my_type, ACE_SOCK_ACCEPTOR> acceptor_type;
 
     EchoService()
     {
         printf("EchoService创建\n");
     }
 
     ~EchoService()
     {
         printf("EchoService销毁\n");
     }
 
     // 响应socket 已经打开,连接已经建立事件
     int open(void*)
     {
         // 注册读事件
         if (reactor()->register_handler(this,ACE_Event_Handler::READ_MASK))
         {
              // 无法注册handler
              return -1;
         }
 
         // 注册写事件
         if (reactor()->register_handler(this,ACE_Event_Handler::WRITE_MASK))
         {
              // 无法注册handler
              return -1;
         }
 
         // 取消写事件,等待有数据时唤醒
         reactor()->cancel_wakeup(this,ACE_Event_Handler::WRITE_MASK);
 
         printf("EchoService已打开\n");
 
         return 0;
     }
 
     // 响应有数据可读事件
     int handle_input(ACE_HANDLE)
     {
         char buf[24];
         ssize_t c = _peer.recv(buf,sizeof(buf) - 1);
 
         if (c == 0)
         {
              // 连接已经关闭
              return -1;
         }
 
         _bufs.push_back(std::string(buf,c));
 
         if (_bufs.size() == 1)
         {
              // 缓冲区尺寸为1 说明原来缓冲区为空,写事件是取消的,这里唤醒它
              reactor()->schedule_wakeup(this,ACE_Event_Handler::WRITE_MASK);
         }
 
         return 0;
     }
 
     // 响应可以发送数据了事件
     int handle_output(ACE_HANDLE)
     {
         while (!_bufs.empty())
         {
              std::string&buf(*_bufs.begin());
              char const*      s(buf.c_str());
              char const*const e(s +buf.size());
              while (s !=e)
              {
                   ssize_t c(_peer.send(s,e - s));
                   if (c == -1 ||c == 0)
                   {
                       // 发送不成功不论发送过程中是否发生阻塞,
                       if (ACE_OS::last_error() ==EWOULDBLOCK)
                       {
                            // 输出缓冲区满,无法再发送数据了(如果你还是继续发送数据,发送会阻塞的)
                            break;
                       }else{
                            // 连接已关闭
                            break;
                       }
                   }else{
                       s += c;
                   }
              }
              if (s ==e)
              {
                   _bufs.pop_front();
              }else{
                   buf = std::string(s,e - s);
                   break;
              }
         }
         if (_bufs.empty())
         {
              // 缓冲区为空,取消写事件监听
              reactor()->cancel_wakeup(this,ACE_Event_Handler::WRITE_MASK);
         }
         // 不论发送是否成功都返回0,因为,如果发送失败,handle_input 也会发生读失败事件,
         // 错误处理有handle_input 返回-1 来触发
         return 0;
     }
    
     int handle_close(ACE_HANDLE = ACE_INVALID_HANDLE, ACE_Reactor_Mask mask = ACE_Event_Handler::ALL_EVENTS_MASK)
     {
         if (mask ==ACE_Event_Handler::WRITE_MASK)
              return 0;
         _peer.close();
         delete this;
         return 0;
     }
 
     // 这个函数主要给reactor::register_handler 时使用的
     ACE_HANDLE get_handle () const
     {
         return _peer.get_handle();
     }
 
     // 这个函数主要给acceptor 使用的
     stream_type& peer()
     {
         return _peer;
     }
 
     // 这个函数主要给acceptor 使用的
     int close (u_long = 0)
     {
         return handle_close();
     }
 
private:
     stream_type _peer;
     std::list<std::string>_bufs;
};
 
int main(int /*argc*/,char* /*argv*/[])
{
     u_short port = 20001;
     ACE_Reactor::instance(newACE_Reactor(newACE_Select_Reactor, true));
     EchoService::acceptor_typeacceptor;
     ACE_INET_Addr svrAddr(port);
     if (acceptor.open(svrAddr))
     {
         fprintf(stderr,"服务打开失败:%s\n",ACE_OS::strerror(ACE_OS::last_error()));
         return 1;
     }else{
         fprintf(stdout,"服务已打开,端口为:%u\n",port);
         ACE_Reactor::instance()->run_reactor_event_loop();
         return 0;
     }
}

 

ajax请求例子 react 相关内容

2018-12-21 16:15:54 weixin_44189883 阅读数 222

同步编程

while(1)
{
    epoll_wait(...)
    for(;;)
    {
        if (fd == listenner_socket)
        {
            cfd = accpt(listenner_socket);
        }
        else
        {
            read(fd, buf, size);
            process(buf);
        }
    }
}

示例如下:
https://github.com/zhiyong0804/net_io/blob/master/epoll/epollsvr.c

缺点:

  1. 所有的处理都放在同一个线程里,这个线程的压力很大,因为网络IO的处理总是比CPU要慢很多;
  2. 同时如果这里有一个客户端的请求,处理比较复杂,则会影响后面其它客户端的请求的响应时间;

半异步半同步编程

T1线程:

while(1)
{
    epoll_wait(...)
    for(;;)
    {
        if (fd == listenner_socket)
        {
            cfd = accpt(listenner_socket);
        }
        else
        {
            read(fd, buf, size);
            enqueue(buf);
        }
    }
}

T2线程:

while (1)
{
    wait_queue(buf);
    process(buf);
}

示例如:
TODO

缺点:
1)线程间需要同步;
2)线程间有数据的拷贝(memcpy),这个拷贝也是很耗CPU的;
3)epoll所在的线程要处理所有的网络IO的读和写,这个线程的压力远远超过其余的业务处理线程。

纯异步 – Reactor设计

先来看libevent库

https://github.com/zhiyong0804/libevent_helloworld/blob/master/libevent_server.c

Reactor设计模式

Reactor设计模式

  • Handles :表示操作系统管理的资源,我们可以理解为fd。
  • Synchronous Event Demultiplexer :同步事件分离器,阻塞等待Handles中的事件发生。
  • Initiation Dispatcher :初始分派器,作用为添加Event handler(事件处理器)、删除Event handler以及分派事件给Event handler。也就是说,Synchronous Event Demultiplexer负责等待新事件发生,事件发生时通知Initiation Dispatcher,然后Initiation Dispatcher调用event handler处理事件。
  • Event Handler :事件处理器的接口
  • Concrete Event Handler :事件处理器的实际实现,而且绑定了一个Handle。因为在实际情况中,我们往往不止一种事件处理器,因此这里将事件处理器接口和实现分开,与C++、Java这些高级语言中的多态类似。

Reactor与多线程的结合

可以参考EasyDarwin里使用Reactor设计模式与多线程的结合
reactor与多线程结合

优点:
1)epoll_wait说在的线程只需要监听网络的事件发生,如socket的可读可写事件(EV_RE / EV_WR)然后封装成一个event,继而封装到一个Task。
2)把这个Task按照某种策略(轮询式的负载均衡或者固定到某个线程)压入线程池中某个线程的任务队列里,平摊了IO的性能和处理请求的均衡,提升总体效率;
3)设计变得更加具有模块化和可扩展性,对epoll反应堆没有任何影响;
4)并发实现,如果线程再设置CPU的亲缘性,则更加提高了网络性能。

-- The End --
**可以扫我一起讨论技术**

扫我 扫我

ajax请求例子 react 相关内容

他能够充分利用多核CPU

博文 来自: u011676589
没有更多推荐了,返回首页