muduo reactor

2010-08-29 23:38:00 Solstice 阅读数 88346

发布一个基于 Reactor 模式的 C++ 网络库

陈硕 (giantchen_AT_gmail)

Blog.csdn.net/Solstice

2010 Aug 30

本文主要介绍 muduo 网络库的使用。其设计与实现将有另文讲解。

目录

由来 1

下载与编译 2

例子 2

基本结构 3

公开接口 4

内部实现 4

线程模型 5

结语 5

由来

半年前我写了一篇《学之者生,用之者死——ACE历史与简评》,其中提到“我心目中理想的网络库”的样子:

  • 线程安全,支持多核多线程
  • 不考虑可移植性,不跨平台,只支持 Linux,不支持 Windows。
  • 在不增加复杂度的前提下可以支持 FreeBSD/Darwin,方便将来用 Mac 作为开发用机,但不为它做性能优化。也就是说 IO multiplexing 使用 poll 和 epoll。
  • 主要支持 x86-64,兼顾 IA32
  • 不支持 UDP,只支持 TCP
  • 不支持 IPv6,只支持 IPv4
  • 不考虑广域网应用,只考虑局域网
  • 只支持一种使用模式:non-blocking IO + one event loop per thread,不考虑阻塞 IO
  • API 简单易用,只暴露具体类和标准库里的类,不使用 non-trivial templates,也不使用虚函数
  • 只满足常用需求的 90%,不面面俱到,必要的时候以 app 来适应 lib
  • 只做 library,不做成 framework
  • 争取全部代码在 5000 行以内(不含测试)
  • 以上条件都满足时,可以考虑搭配 Google Protocol Buffers RPC

在想清楚这些目标之后,我开始第三次尝试编写自己的 C++ 网络库。与前两次不同,这次我一开始就想好了库的名字,叫 muduo (木铎),并在 Google code 上创建了项目: http://code.google.com/p/muduo/ 。muduo 的主体内容在 5 月底已经基本完成,现在我把它开源。

本文主要介绍 muduo 网络库的使用,其设计与实现将有另文讲解。

下载与编译

下载地址: http://muduo.googlecode.com/files/muduo-0.1.0-alpha.tar.gz

SHA1 Checksum: 5d3642e311177ded89ed0d15c10921738f8c984c

Muduo 使用了 Linux 较新的系统调用,要求 Linux 的内核版本大于 2.6.28 (我自己用的是 2.6.32 )。在 Debian Squeeze / Ubuntu 10.04 LTS 上编译测试通过,32 位和 64 位系统都能使用。

Muduo 采用 CMake 为 build system,安装方法:

$ sudo apt-get install cmake

Muduo 依赖 Boost,很容易安装:

$ sudo apt-get install libboost1.40-dev # 或 libboost1.42-dev

编译方法很简单:

$ tar zxf muduo-0.1.0-alpha.tar.gz

$ cd muduo/

$ ./build.sh

# 编译生成的可执行文件和静态库文件分别位于 ../build/debug/{bin,lib}

如果要编译 release 版,可执行

$ BUILD_TYPE=release ./build.sh

# 编译生成的可执行文件和静态库文件分别位于 ../build/release/{bin,lib}

编译完成之后请试运行其中的例子。比如 bin/inspector_test ,然后通过浏览器访问 http://10.0.0.10:12345/ 或 http://10.0.0.10:12345/proc/status,其中 10.0.0.10 替换为你的 Linux box 的 IP。

例子

Muduo 附带了几十个小例子,位于 examples 目录。其中包括从 Boost.Asio、JBoss Netty、Python Twisted 等处移植过来的例子。

examples

|-- simple # 简单网络协议的实现

|   |-- allinone  # 在一个程序里同时实现下面 5 个协议

|   |-- chargen   # RFC 864,可测试带宽

|   |-- daytime # RFC 867

|   |-- discard # RFC 863

|   |-- echo # RFC 862

|   |-- time # RFC 868

|   `-- timeclient # time 协议的客户端

|-- hub # 一个简单的 pub/sub/hub 服务,演示应用级的广播

|-- roundtrip # 测试两台机器的网络延时与时间差

|-- asio # 从 Boost.Asio 移植的例子

|   |-- chat # 聊天服务

|   `-- tutorial # 一系列 timers

|-- netty # 从 JBoss Netty 移植的例子

|   |-- discard # 可用于测试带宽,服务器可多线程运行

|   |-- echo # 可用于测试带宽,服务器可多线程运行

|   `-- uptime # TCP 长连接

`-- twisted # 从 Python Twisted 移植的例子

    `-- finger # finger01 ~ 07

基本结构

Muduo 的目录结构如下。

muduo

|-- base # 与网络无关的基础代码,已提前发布

`-- net # 网络库

    |-- http # 一个简单的可嵌入的 web 服务器

    |-- inspect # 基于以上 web 服务器的“窥探器”,用于报告进程的状态

    `-- poller # poll(2) 和 epoll(4) 两种 IO multiplexing 后端

Muduo 是基于 Reactor 模式的网络库,其核心是个事件循环 EventLoop,用于响应计时器和 IO 事件。Muduo 采用基于对象(object based)而非面向对象(object oriented)的设计风格,其接口多以 boost::function + boost::bind 表达

Muduo 的头文件明确分为客户可见和客户不可见两类。客户可见的为白底,客户不可见的为灰底。

inc

这里简单介绍各个头文件及 class 的作用,详细的介绍留给以后的博客。

公开接口
  • Buffer 仿 Netty ChannelBuffer 的 buffer class,数据的读写透过 buffer 进行
  • InetAddress 封装 IPv4 地址 (end point),注意,muduo 目前不能解析域名,只认 IP
  • EventLoop 反应器 Reactor,用户可以注册计时器回调
  • EventLoopThread 启动一个线程,在其中运行 EventLoop::loop()
  • TcpConnection 整个网络库的核心,封装一次 TCP 连接
  • TcpClient 用于编写网络客户端,能发起连接,并且有重试功能
  • TcpServer 用于编写网络服务器,接受客户的连接
  • 在这些类中,TcpConnection 的生命期依靠 shared_ptr 控制(即用户和库共同控制)。Buffer 的生命期由 TcpConnection 控制。其余类的生命期由用户控制。
  • HttpServer 和 Inspector,暴露出一个 http 界面,用于监控进程的状态,类似于 Java JMX。这么做的原因是,《程序员修炼之道》第 6 章第 34 条提到“对于更大、更复杂的服务器代码,提供其操作的内部试图的一种漂亮技术是使用内建的 Web 服务器”,Jeff Dean 也说“(每个 Google 的服务器进程)Export HTML-based status pages for easy diagnosis”。
内部实现
  • Channel 是 selectable IO channel,负责注册与响应 IO 事件,它不拥有 file descriptor。它是 Acceptor、Connector、EventLoop、TimerQueue、TcpConnection 的成员,生命期由后者控制。
  • Socket 封装一个 file descriptor,并在析构时关闭 fd。它是 Acceptor、TcpConnection 的成员,生命期由后者控制。EventLoop、TimerQueue 也拥有 fd,但是不封装为 Socket。
  • SocketsOps 封装各种 sockets 系统调用。
  • EventLoop 封装事件循环,也是事件分派的中心。它用 eventfd(2) 来异步唤醒,这有别于传统的用一对 pipe(2) 的办法。它用 TimerQueue 作为计时器管理,用 Poller 作为 IO Multiplexing。
  • Poller 是 PollPoller 和 EPollPoller 的基类,采用“电平触发”的语意。它是 EventLoop 的成员,生命期由后者控制。
  • PollPoller 和 EPollPoller 封装 poll(2) 和 epoll(4) 两种 IO Multiplexing 后端。Poll 的存在价值是便于调试,因为 poll(2) 调用是上下文无关的,用 strace 很容易知道库的行为是否正确。
  • Connector 用于发起 TCP 连接,它是 TcpClient 的成员,生命期由后者控制。
  • Acceptor 用于接受 TCP 连接,它是 TcpServer 的成员,生命期由后者控制。
  • TimerQueue 用 timerfd 实现定时,这有别于传统的设置 poll/epoll_wait 的等待时长的办法。为了简单起见,目前用链表来管理 Timer,如果有必要可改为优先队列,这样复杂度可从 O(n) 降为O(ln n) (某些操作甚至是 O(1))。它是 EventLoop 的成员,生命期由后者控制。
  • EventLoopThreadPool 用于创建 IO 线程池,也就是说把 TcpConnection 分派到一组运行 EventLoop 的线程上。它是 TcpServer 的成员,生命期由后者控制。

类图
muduo_class_diagram

线程模型

Muduo 的线程模型符合我主张的 one loop per thread + thread pool 模型。每个线程最多有一个 EventLoop。每个 TcpConnection 必须归某个 EventLoop 管理,所有的 IO 会转移到这个线程,换句话说一个 file descriptor 只能由一个线程读写。TcpConnection 所在的线程由其所属的 EventLoop 决定,这样我们可以很方便地把不同的 TCP 连接放到不同的线程去,也可以把一些 TCP 连接放到一个线程里。TcpConnection 和 EventLoop 是线程安全的,可以跨线程调用。TcpServer 直接支持多线程,它有两种模式:

1. 单线程,accept 与 TcpConnection 用同一个线程做 IO。

2. 多线程,accept 与 EventLoop 在同一个线程,另外创建一个 EventLoopThreadPool,新到的连接会按 round-robin 方式分配到线程池中。

结语

Muduo 是我对常见网络编程任务的总结,用它我能很容易地编写多线程的 TCP 服务器和客户端。Muduo 是我业余时间的作品,代码估计还有很多 bug,功能也不完善(例如不支持 signal 处理),待日后慢慢改进吧。

 

后记:性能测试:

1. muduo 与 boost asio 吞吐量对比

2. muduo 与 libevent2 吞吐量对比

3. 击鼓传花:对比 muduo 与 libevent2 的事件处理效率

2018-01-06 18:36:02 Tanswer_ 阅读数 1102

本文分析一下Reactor模式的实现,关键是三个类:Channel、Poller、EventLoop。

事件分发类 Channel

Channel 是 selectable IO channel,负责注册与响应IO事件,包括注册给Poller的 fd 及其监听的事件,以及事件发生了所调的回调函数。
每个Channel对象自始至终只负责一个 fd 的事件分发,封装了一系列该 fd 对应的操作,使用了回调函数,包括可读、可写、关闭和错误处理四个。
首先给定Channel所属的 loop,及其要处理的 fd;接着注册 fd 上需要监听的事件,如果是常用的读写事件的话,可以直接调用接口函数enableReading或enableWriting来注册对应fd上的事件,disable*是销毁指定的事件;然后通过 set*Callback 来设置事件发生时的回调。
注册事件时函数调用关系,如下:Channel::update()->EventLoop::updateChannel(Channel*)->Poller::updateChannel(Channel*),最终向 poll 系统调用的监听事件表注册或修改事件。

Channel.h

#ifndef MUDUO_NET_CHANNEL_H
#define MUDUO_NET_CHANNEL_H

#include <boost/function.hpp>
#include <boost/noncopyable.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/weak_ptr.hpp>

#include <muduo/base/Timestamp.h>

namespace muduo
{
namespace net
{

class EventLoop;

///
/// A selectable I/O channel.
///
/// This class doesn't own the file descriptor.
/// The file descriptor could be a socket,
/// an eventfd, a timerfd, or a signalfd
/* 事件分发类,主要包括 fd fd监听的事件、事件回调函数 */
class Channel : boost::noncopyable
{
 public:
  /* 事件回调函数模板 */
  typedef boost::function<void()> EventCallback;
  /* 读操作回调函数,需要传入时间 */
  typedef boost::function<void(Timestamp)> ReadEventCallback;

  /* 
   * 一个Channel只负责一个fd,但Channel不拥有fd
   * EventLoop调用Poller监听事件集合,就绪的事件元素就是Channel
   * Channel不仅是返回就绪事件,还可以处理事件
   */
  Channel(EventLoop* loop, int fd);
  ~Channel();

  /* 
   * Channel的核心
   * 处理事件,一般由Poller通过EventLoop来调用
   * 当fd对应的事件就绪后Channel::handleEvent()执行相应的事件回调
   * 如可读事件执行 readCallback_()
   */
  void handleEvent(Timestamp receiveTime);

  /* 设置四种回调函数 */
  void setReadCallback(const ReadEventCallback& cb)
  { readCallback_ = cb; }
  void setWriteCallback(const EventCallback& cb)
  { writeCallback_ = cb; }
  void setCloseCallback(const EventCallback& cb)
  { closeCallback_ = cb; }
  void setErrorCallback(const EventCallback& cb)
  { errorCallback_ = cb; }
#ifdef __GXX_EXPERIMENTAL_CXX0X__
  /* C++11版本 右值语义 */
  void setReadCallback(ReadEventCallback&& cb)
  { readCallback_ = std::move(cb); }
  void setWriteCallback(EventCallback&& cb)
  { writeCallback_ = std::move(cb); }
  void setCloseCallback(EventCallback&& cb)
  { closeCallback_ = std::move(cb); }
  void setErrorCallback(EventCallback&& cb)
  { errorCallback_ = std::move(cb); }
#endif

  /// Tie this channel to the owner object managed by shared_ptr,
  /// prevent the owner object being destroyed in handleEvent.
  void tie(const boost::shared_ptr<void>&);

  /* 返回该Channel负责的fd*/
  int fd() const { return fd_; }
  /* 返回 fd  注册的事件 */
  int events() const { return events_; }
  /* 
   * 进行poll或者epoll_wait后,根据fd的返回事件调用此函数,设定fd的就绪事件类型
   * handleEvent 根据就绪事件类型(revents_)来决定执行哪个事件回调函数
   */
  void set_revents(int revt) { revents_ = revt; } // used by pollers
  // int revents() const { return revents_; }
  /* 判断fd是不是 没有 事件监听 */
  bool isNoneEvent() const { return events_ == kNoneEvent; }

  /* update 通过eventloop 去更新epoll中fd的监听事件 */
  /* fd 注册可读事件 */
  void enableReading() { events_ |= kReadEvent; update(); }
  /* 销毁读事件 */
  void disableReading() { events_ &= ~kReadEvent; update(); }
  /* fd 注册可写事件 */
  void enableWriting() { events_ |= kWriteEvent; update(); }
  /* 销毁写事件 */
  void disableWriting() { events_ &= ~kWriteEvent; update(); }
  /* 停止监听所有事件 */
  void disableAll() { events_ = kNoneEvent; update(); }
  /* 是否注册了读写事件 */
  bool isWriting() const { return events_ & kWriteEvent; }
  bool isReading() const { return events_ & kReadEvent; }

  // for Poller
  // 还不懂
  int index() { return index_; }
  void set_index(int idx) { index_ = idx; }

  // for debug
  string reventsToString() const;
  string eventsToString() const;

  void doNotLogHup() { logHup_ = false; }

  /* 返回持有本Channel的EventLoop 指针 */
  EventLoop* ownerLoop() { return loop_; }
  /* 将Channel 从EventLoop中移除 */
  void remove();

 private:
  static string eventsToString(int fd, int ev);

  /* 通过调用loop_->updateChannel()来注册或改变本fd在epoll中监听的事件 */
  void update();
  void handleEventWithGuard(Timestamp receiveTime);

  static const int kNoneEvent;  //无事件
  static const int kReadEvent;  //可读事件
  static const int kWriteEvent; //可写事件

  EventLoop* loop_;             //本Channel所属的EventLoop
  const int  fd_;               //本Channel负责的文件描述符,Channel不拥有fd
  int        events_;           //fd 注册的事件
  int        revents_;          //通过poll返回的就绪事件类型 
  int        index_;            //被Poller使用的下标 used by Poller.
  bool       logHup_;           //是否生成某些日志

  boost::weak_ptr<void> tie_;
  bool tied_;
  bool eventHandling_;          //是否正在处理事件
  bool addedToLoop_;
  /* 四种回调函数,使用boost提供的function模板*/
  ReadEventCallback readCallback_;  //读事件回调函数
  EventCallback writeCallback_;     //写事件回调函数
  EventCallback closeCallback_;     //关闭事件回调函数
  EventCallback errorCallback_;     //错误事件回调函数
};

}
}
#endif  // MUDUO_NET_CHANNEL_H

Channel.cc

/* 事件 */
const int Channel::kNoneEvent = 0;
const int Channel::kReadEvent = POLLIN | POLLPRI;
const int Channel::kWriteEvent = POLLOUT;

Channel::Channel(EventLoop* loop, int fd__)
  : loop_(loop),
    fd_(fd__),
    events_(0),
    revents_(0),
    index_(-1),
    logHup_(true),
    tied_(false),
    eventHandling_(false),
    addedToLoop_(false)
{
}

Channel::~Channel()
{
  assert(!eventHandling_);
  assert(!addedToLoop_);
  if (loop_->isInLoopThread())
  {
    assert(!loop_->hasChannel(this));
  }
}
/* 通过调用loop中的函数来改变对应fd在epoll中监听的事件 */
void Channel::update()
{
  addedToLoop_ = true;
  /* loop中又会去调用Poller中的函数来实现 */
  loop_->updateChannel(this);
}

/* 和上面类似,通过EventLoop 从epoll/poll 中删除fd*/
void Channel::remove()
{
  assert(isNoneEvent());
  addedToLoop_ = false;
  /* EventLoop 会调用Poller中的函数 */
  loop_->removeChannel(this);
}

void Channel::handleEvent(Timestamp receiveTime)
{
  boost::shared_ptr<void> guard;
  if (tied_)
  {
    guard = tie_.lock();
    if (guard)
    {
      handleEventWithGuard(receiveTime);
    }
  }
  else
  {
    handleEventWithGuard(receiveTime);
  }
}

/* 处理各种事件 */
void Channel::handleEventWithGuard(Timestamp receiveTime)
{
  eventHandling_ = true;
  LOG_TRACE << reventsToString();
  if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
  {
    if (logHup_)
    {
      LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLHUP";
    }
    if (closeCallback_) closeCallback_();
  }

  if (revents_ & POLLNVAL)
  {
    LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLNVAL";
  }

  if (revents_ & (POLLERR | POLLNVAL))  //错误事件处理
  {
    if (errorCallback_) errorCallback_();
  }
  if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))    //可读
  {
    if (readCallback_) readCallback_(receiveTime);
  }
  if (revents_ & POLLOUT)   //可写
  {
    if (writeCallback_) writeCallback_();
  }
  eventHandling_ = false;
}

IO multiplexing 类 Poller

Poller 类是IO复用类的基类,muduo 同时支持poll 和 epoll 两种IO multiplexing 机制,它有两个PollPoller 和 EpollPoller 两个子类,内部分别采用 poll 和 epoll 实现。它的职责仅仅是IO复用,事件分发交给 Channel 完成,生命期和 EventLoop 一样长。
拿 epoll 对 Poller 的实现来说,基本是 epoll 功能的封装,poll 函数调用 epoll_wait 来监听注册了的文件描述符,将返回的就绪事件装入 activeChannels 数组,还可以控制 channel 中事件的增删改。

Poller.h

#ifndef MUDUO_NET_POLLER_H
#define MUDUO_NET_POLLER_H

#include <map>
#include <vector>
#include <boost/noncopyable.hpp>

#include <muduo/base/Timestamp.h>
#include <muduo/net/EventLoop.h>

namespace muduo
{
namespace net
{

class Channel;

///
/// Base class for IO Multiplexing
///
/// This class doesn't own the Channel objects.
class Poller : boost::noncopyable //不拥有Channel
{
 public:
  typedef std::vector<Channel*> ChannelList;
  /* 用于返回就绪事件集合 */
  Poller(EventLoop* loop);
  virtual ~Poller();

  /// Polls the I/O events.
  /// Must be called in the loop thread.
  /* Poller的核心功能,将就绪事件加入到 activeChannels 中 */
  virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels) = 0;

  /// Changes the interested I/O events.
  /// Must be called in the loop thread.
  /* 更新 fd 的监听事件
   * Channel::update()->EventLoop::updateChannel(Channel* channel)->Poller::updateChannel(Channel* channel)
   */
  virtual void updateChannel(Channel* channel) = 0;

  /// Remove the channel, when it destructs.
  /// Must be called in the loop thread.
  /* 从poll/epoll 中移除fd 停止监听此fd 
   * EventLoop::removeChannel(Channel*)->Poller::removeChannel(Channel*)
   */
  virtual void removeChannel(Channel* channel) = 0;
  /* 判断该poll//epoll 模型是否监听了Channel对应的fd */
  virtual bool hasChannel(Channel* channel) const;

  /* */
  static Poller* newDefaultPoller(EventLoop* loop);

  /* 断言 确保没有跨线程 */
  void assertInLoopThread() const
  {
    ownerLoop_->assertInLoopThread();
  }

 protected:
  /*
   * 记录fd到Channel的对应关系 
   * 底层的epoll每次监听完fd,要根据这个映射关系去寻找对应的Channel
   */
  typedef std::map<int, Channel*> ChannelMap;
  ChannelMap channels_;//保存epoll监听的fd,及其对应的Channel指针

 private:
  /* 这个Poller对象所属的 EventLoop */
  EventLoop* ownerLoop_;
};

}
}
#endif  // MUDUO_NET_POLLER_H

epoll对Poller的实现 :poller/EPollPoller.h

#ifndef MUDUO_NET_POLLER_EPOLLPOLLER_H
#define MUDUO_NET_POLLER_EPOLLPOLLER_H

#include <muduo/net/Poller.h>

#include <vector>

struct epoll_event;

namespace muduo
{
namespace net
{

///
/// IO Multiplexing with epoll(4).
///
class EPollPoller : public Poller
{
 public:
  EPollPoller(EventLoop* loop);
  virtual ~EPollPoller();

  /* 内部调用 epoll_wait,初始化对应的channel,加入到activeChannels */
  virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels);
  virtual void updateChannel(Channel* channel);
  virtual void removeChannel(Channel* channel);

 private:
  static const int kInitEventListSize = 16;

  static const char* operationToString(int op);

  /* 将epoll_wait 返回的活跃事件填充到activeChannels */
  void fillActiveChannels(int numEvents,
                          ChannelList* activeChannels) const;
  /* 更改 channel ,调用epoll_ctl */
  void update(int operation, Channel* channel);

  typedef std::vector<struct epoll_event> EventList;

  int epollfd_;         //epollfd
  EventList events_;    //epoll事件数组
};

}
}
#endif  // MUDUO_NET_POLLER_EPOLLPOLLER_H

EventLoop 类

EventLoop类是Reactor模式的核心,一个线程一个事件循环,即one loop per thread,EventLoop 对象的生命周期通常与其所属的线程一样长。其主要功能是运行事件循环,等待事件发生,然后调用回调处理发生的事件。EventLoop::loop() -> Poller::poll() 填充就绪事件集合 activeChannels,然后遍历该容器,执行每个 channel 的 Channel::handleEvent() 完成对应就绪事件回调。

EventLoop.h

#ifndef MUDUO_NET_EVENTLOOP_H
#define MUDUO_NET_EVENTLOOP_H

#include <vector>

#include <boost/any.hpp>
#include <boost/function.hpp>
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>

#include <muduo/base/Mutex.h>
#include <muduo/base/CurrentThread.h>
#include <muduo/base/Timestamp.h>
#include <muduo/net/Callbacks.h>
#include <muduo/net/TimerId.h>

namespace muduo
{
namespace net
{

class Channel;  //前向声明,事件分发器主要用于事件注册与回调
class Poller;   //IO复用类,监听事件集合,即 epoll /poll 的功能
class TimerQueue;

///
/// Reactor, at most one per thread.
///
/// This is an interface class, so don't expose too much details.
/* EventLoop 是不可拷贝的 muduo中的大多数class都是不可拷贝的 */
class EventLoop : boost::noncopyable
{
 public:
  typedef boost::function<void()> Functor;  //回调函数

  EventLoop();
  ~EventLoop();  // force out-line dtor, for scoped_ptr members.

  ///
  /// Loops forever.
  ///
  /// Must be called in the same thread as creation of the object.
  ///
  /* 
   * IO线程创建了EventLoop对象,是这个类的核心接口
   * 用来启动事件循环 
   * EventLoop::loop()->Poller::Poll()获得就绪的事件集合
   * 再通过Channel::handleEvent()执行就绪事件回调
   */
  void loop();

  /// Quits loop.
  ///
  /// This is not 100% thread safe, if you call through a raw pointer,
  /// better to call through shared_ptr<EventLoop> for 100% safety.
  //终止事件循环
  void quit();

  ///
  /// Time when poll returns, usually means data arrival.
  ///
  Timestamp pollReturnTime() const { return pollReturnTime_; }

  int64_t iteration() const { return iteration_; }

  void runInLoop(const Functor& cb);

  void queueInLoop(const Functor& cb);

  size_t queueSize() const;

#ifdef __GXX_EXPERIMENTAL_CXX0X__
  void runInLoop(Functor&& cb);
  void queueInLoop(Functor&& cb);
#endif

  // timers

  //在某个绝对时间点执行定时回调
  TimerId runAt(const Timestamp& time, const TimerCallback& cb);
  ///
  ///
  //相对时间 执行定时回调
  TimerId runAfter(double delay, const TimerCallback& cb);
  ///
  //每隔interval执行定时回调
  TimerId runEvery(double interval, const TimerCallback& cb);
  ///
  ///
  //删除某个定时器
  void cancel(TimerId timerId);

#ifdef __GXX_EXPERIMENTAL_CXX0X__
  TimerId runAt(const Timestamp& time, TimerCallback&& cb);
  TimerId runAfter(double delay, TimerCallback&& cb);
  TimerId runEvery(double interval, TimerCallback&& cb);
#endif

  // internal usage
  // 唤醒IO线程
  void wakeup();
  // 更新某个事件分发器
  // 调用poller->updateChannel(channel)完成 fd 向事件集合注册事件及事件回调函数
  void updateChannel(Channel* channel);
  // 删除某个事件分发器
  void removeChannel(Channel* channel);
  bool hasChannel(Channel* channel);

  // pid_t threadId() const { return threadId_; }
  void assertInLoopThread()
  {
    if (!isInLoopThread())  //若运行线程不拥有EventLoop则退出,保证one loop per thread
    {
      abortNotInLoopThread();
    }
  }
  /* 判断当前线程是否为拥有此 EventLoop 的线程 */
  bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }
  // bool callingPendingFunctors() const { return callingPendingFunctors_; }
  bool eventHandling() const { return eventHandling_; }

  void setContext(const boost::any& context)
  { context_ = context; }

  const boost::any& getContext() const
  { return context_; }

  boost::any* getMutableContext()
  { return &context_; }

  /* 返回此线程的EventLoop对象 */
  static EventLoop* getEventLoopOfCurrentThread();

 private:
  /* 在不拥有EventLoop 线程中终止 */
  void abortNotInLoopThread();
  /* wakeupFd_ 上可读事件回调 */
  void handleRead();  // waked up
  /* 执行队列pendingFunctors 中的用户任务回调 */
  void doPendingFunctors();

  void printActiveChannels() const; // DEBUG

  typedef std::vector<Channel*> ChannelList;

  bool looping_; /* atomic */ //运行标志
  bool quit_; /* atomic and shared between threads, okay on x86, I guess. */ //退出循环标志
  bool eventHandling_; /* atomic */
  bool callingPendingFunctors_; /* atomic *///是否正在执行用户任务回调
  int64_t iteration_;
  const pid_t threadId_;    //EventLoop 的附属线程ID
  Timestamp pollReturnTime_;
  boost::scoped_ptr<Poller> poller_;        //多路复用类Poller
  boost::scoped_ptr<TimerQueue> timerQueue_;//定时器队列用于存放定时器

  int wakeupFd_;    //eventfd返回的eventfd,用于唤醒EventLoop所在的线程
  // unlike in TimerQueue, which is an internal class,
  // we don't expose Channel to client.
  // 通过wakeupChannel_观察wakeupFd_上的可读事件
  // 当可读表明需要唤醒EventLoop所在线程执行用户回调
  boost::scoped_ptr<Channel> wakeupChannel_;
  boost::any context_;

  // scratch variables
  ChannelList activeChannels_;      //活跃的事件集合,类似epoll的就绪事件集合
  Channel* currentActiveChannel_;   //当前活跃的事件

  mutable MutexLock mutex_;
  std::vector<Functor> pendingFunctors_; // @GuardedBy mutex_ //存放用户任务回调
};

}
}
#endif  // MUDUO_NET_EVENTLOOP_H

下面主要看一下 loop() 和 quit() 的实现:

/* 主循环,监听事件集合,执行就绪事件的处理函数 */
void EventLoop::loop()
{
  assert(!looping_);
  assertInLoopThread();
  looping_ = true;
  quit_ = false;  // FIXME: what if someone calls quit() before loop() ?
  LOG_TRACE << "EventLoop " << this << " start looping";

  while (!quit_)
  {
    activeChannels_.clear();
    /* activeChannels_ 为就绪事件集合 */
    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
    ++iteration_;
    if (Logger::logLevel() <= Logger::TRACE)
    {
      printActiveChannels();
    }
    // TODO sort channel by priority
    eventHandling_ = true;
    for (ChannelList::iterator it = activeChannels_.begin();
        it != activeChannels_.end(); ++it)
    {
      currentActiveChannel_ = *it;  //取一个就绪事件
      currentActiveChannel_->handleEvent(pollReturnTime_);  //执行相应事件回调
    }
    currentActiveChannel_ = NULL;
    eventHandling_ = false;
    doPendingFunctors();
  }

  LOG_TRACE << "EventLoop " << this << " stop looping";
  looping_ = false;
}

void EventLoop::quit()
{
  quit_ = true;     //设置标志位,有延迟不会马上停止循环,当下次检查while(!quit_)时起效
  // There is a chance that loop() just executes while(!quit_) and exits,
  // then EventLoop destructs, then we are accessing an invalid object.
  // Can be fixed using mutex_ in both places.
  if (!isInLoopThread())
  {
    wakeup();   //其他线程唤醒 EventLoop线程并终止
  }
}

主要是先了解 Reactor 模式的关键结构,所以上面注释的内容是 muduo 的 Reactor 模式的核心内容,一些细节没有详细说明。流程图如下:
这里写图片描述

2019-03-04 22:20:24 qq_36616692 阅读数 336

作者原文:https://blog.csdn.net/Solstice/article/details/5848547#_Toc17667

相关的名词解释:

1.一个基于 Reactor 模式的 C++ 网络库
Reactor 模式:反应器设计模式,处理并发服务请求,并将请求提交到一个或
者多个服务处理程序的事件设计模式。当客户端请求抵达后,服务处理程序使用多路分配策略,
由一个非阻塞的线程来接收所有的请求,然后派发这些请求至相关的工作线程进行处理。
(用单线程做多线程的事)

2. non-trivial templates
Trivial(平凡)和Non-Trivial(不平凡)是对于class(类)的或者类中的四个函数而言的:
1.构造函数
2.拷贝构造函数
3.赋值函数
4.析构函数
Non-Trivial如果上面四种函数满足以下三点任意一项或一项以上:
1.有基类
2.显式(explict)定义了以上四种函数一种或以上
3.类里有非静态非POD的数据成员
那么该类就是Non-Trivial(不平凡)有意义的,所以有意义(Non-Trivial)的函数都有一些“多余”的操作,和系统自动创建的默认缺省函数有些差别。而有意义(Non-Trivial)的类则是含有Non-Trivial的函数。

3.IA32 
Intel Architecture 32bit英特尔32位体系架构
4.FreeBSD
一种类UNIX操作系统,内部结构和系统API上和UNIX有很大的兼容性。
5.Darwin
苹果电脑于2000年所释出的一个开放原始码操作系统。Darwin 是MacOSX 操作环境的操作系统成份

6.I/O模式
-- 阻塞 I/O(blocking IO)
-- 非阻塞 I/O(nonblocking IO)
-- I/O 多路复用( IO multiplexing)
-- 信号驱动 I/O( signal driven IO)
-- 异步 I/O(asynchronous IO)
7.IO multiplexing有三种复用方式
select()
poll() IO多路复用
epoll IO多路复用
参考:https://blog.csdn.net/davidsguo008/article/details/73556811

8.基于对象和面向对象的区别
只有完全具有封装、继承、多态三大特点的才能够叫做面向对象,否则即使设计中蕴含了一些对象的概念,也顶多称为基于对象。
通常基于对象是使用对象,意味着它们有像C++的结构加函数这样的对象,然而这只是到达面向对象语言的一部分,停留在把函数捆绑在结构内部的语言是基于对象的。但是无法利用现有的对象模板产生新的对象类型,继而产生新的对象,也就是说基于对象一般没有继承的特点。没有了继承的概念也就无从谈论多态。现在的很多流行技术都是基于对象的,它们使用一些封装好的对象,调用对象的方法,设置对象的属性。但是它们无法让程序员派生新对象类型。他们只能使用现有对象的方法和属性。所以当你判断一个新的技术是否是面向对象的时候,通常可以使用后两个特性来加以判断。例如:C++是面向对象的,而VB只是基于对象的。
ADT(Abstract Data Type)是指一个数学模型以及定义在该模型上的一组操作。ADT包括数据数据元素,数据关系以及相关的操作。

头文件分类:白色用户可见,灰色不可见

9.non-trivial有用的
面向对象与基于对象:
library和framework的区别:
library中类相对比较独立,编写时需要编写一些“胶水代码”来粘合
framework是能够运用于特定应用领域的,不需要编写过多的“胶水代码”来粘合
重要区别:框架提供用户注册的一些回调函数,使得框架能够调用我们所编写的回调函数,这就使得控制反转了。

.cc文件是Linux/Unix下为C++源文件的默认扩展名,与.cpp(windows下)一个意思

用GCC/G++在 Linux/Unix下可以打开和编译。用一般的记事本就可以打开和编辑。

 

面向对象/基于对象

基于对象的效率稍高一点点

2010-10-13 22:18:00 iteye_11002 阅读数 49


本文主要介绍 muduo 网络库的使用。其设计与实现将有另文讲解。

目录

由来 1

下载与编译 2

例子 2

基本结构 3

公开接口 4

内部实现 4

线程模型 5

结语 5

由来
半年前我写了一篇《学之者生,用之者死——ACE历史与简评》,其中提到“我心目中理想的网络库”的样子:

线程安全,支持多核多线程
不考虑可移植性,不跨平台,只支持 Linux,不支持 Windows。
在不增加复杂度的前提下可以支持 FreeBSD/Darwin,方便将来用 Mac 作为开发用机,但不为它做性能优化。也就是说 IO multiplexing 使用 poll 和 epoll。
主要支持 x86-64,兼顾 IA32
不支持 UDP,只支持 TCP
不支持 IPv6,只支持 IPv4
不考虑广域网应用,只考虑局域网
只支持一种使用模式:non-blocking IO + one event loop per thread,不考虑阻塞 IO
API 简单易用,只暴露具体类和标准库里的类,不使用 non-trivial templates,也不使用虚函数
只满足常用需求的 90%,不面面俱到,必要的时候以 app 来适应 lib
只做 library,不做成 framework
争取全部代码在 5000 行以内(不含测试)
以上条件都满足时,可以考虑搭配 Google Protocol Buffers RPC
在想清楚这些目标之后,我开始第三次尝试编写自己的 C++ 网络库。与前两次不同,这次我一开始就想好了库的名字,叫 muduo (木铎),并在 Google code 上创建了项目: http://code.google.com/p/muduo/ 。muduo 的主体内容在 5 月底已经基本完成,现在我把它开源。

本文主要介绍 muduo 网络库的使用,其设计与实现将有另文讲解。

下载与编译
下载地址: http://muduo.googlecode.com/files/muduo-0.1.0-alpha.tar.gz

SHA1 Checksum: 5d3642e311177ded89ed0d15c10921738f8c984c

Muduo 使用了 Linux 较新的系统调用,要求 Linux 的内核版本大于 2.6.28 (我自己用的是 2.6.32 )。在 Debian Squeeze / Ubuntu 10.04 LTS 上编译测试通过,32 位和 64 位系统都能使用。

Muduo 采用 CMake 为 build system,安装方法:

$ sudo apt-get install cmake

Muduo 依赖 Boost,很容易安装:

$ sudo apt-get install libboost1.40-dev # 或 libboost1.42-dev

编译方法很简单:

$ tar zxf muduo-0.1.0-alpha.tar.gz

$ cd muduo/

$ ./build.sh

# 编译生成的可执行文件和静态库文件分别位于 ../build/debug/{bin,lib}

如果要编译 release 版,可执行

$ BUILD_TYPE=release ./build.sh

# 编译生成的可执行文件和静态库文件分别位于 ../build/release/{bin,lib}

编译完成之后请试运行其中的例子。比如 bin/inspector_test ,然后通过浏览器访问 http://10.0.0.10:12345/http://10.0.0.10:12345/proc/status,其中 10.0.0.10 替换为你的 Linux box 的 IP。

例子
Muduo 附带了几十个小例子,位于 examples 目录。其中包括从 Boost.Asio、JBoss Netty、Python Twisted 等处移植过来的例子。

examples

|-- simple # 简单网络协议的实现

| |-- allinone # 在一个程序里同时实现下面 5 个协议

| |-- chargen # RFC 864,可测试带宽

| |-- daytime # RFC 867

| |-- discard # RFC 863

| |-- echo # RFC 862

| |-- time # RFC 868

| `-- timeclient # time 协议的客户端

|-- hub # 一个简单的 pub/sub/hub 服务,演示应用级的广播

|-- roundtrip # 测试两台机器的网络延时与时间差

|-- asio # 从 Boost.Asio 移植的例子

| |-- chat # 聊天服务

| `-- tutorial # 一系列 timers

|-- netty # 从 JBoss Netty 移植的例子

| |-- discard # 可用于测试带宽,服务器可多线程运行

| |-- echo # 可用于测试带宽,服务器可多线程运行

| `-- uptime # TCP 长连接

`-- twisted # 从 Python Twisted 移植的例子

`-- finger # finger01 ~ 07

基本结构
Muduo 的目录结构如下。

muduo

|-- base # 与网络无关的基础代码,已提前发布

`-- net # 网络库

|-- http # 一个简单的可嵌入的 web 服务器

|-- inspect # 基于以上 web 服务器的“窥探器”,用于报告进程的状态

`-- poller # poll(2) 和 epoll(4) 两种 IO multiplexing 后端

Muduo 是基于 Reactor 模式的网络库,其核心是个事件循环 EventLoop,用于响应计时器和 IO 事件。Muduo 采用基于对象(object based)而非面向对象(object oriented)的设计风格,其接口多以 boost::function + boost::bind 表达。

Muduo 的头文件明确分为客户可见和客户不可见两类。客户可见的为白底,客户不可见的为灰底。

这里简单介绍各个头文件及 class 的作用,详细的介绍留给以后的博客。

公开接口
Buffer 仿 Netty ChannelBuffer 的 buffer class,数据的读写透过 buffer 进行
InetAddress 封装 IPv4 地址 (end point),注意,muduo 目前不能解析域名,只认 IP
EventLoop 反应器 Reactor,用户可以注册计时器回调
EventLoopThread 启动一个线程,在其中运行 EventLoop::loop()
TcpConnection 整个网络库的核心,封装一次 TCP 连接
TcpClient 用于编写网络客户端,能发起连接,并且有重试功能
TcpServer 用于编写网络服务器,接受客户的连接
在这些类中,TcpConnection 的生命期依靠 shared_ptr 控制(即用户和库共同控制)。Buffer 的生命期由 TcpConnection 控制。其余类的生命期由用户控制。
HttpServer 和 Inspector,暴露出一个 http 界面,用于监控进程的状态,类似于 Java JMX。这么做的原因是,《程序员修炼之道》第 6 章第 34 条提到“对于更大、更复杂的服务器代码,提供其操作的内部试图的一种漂亮技术是使用内建的 Web 服务器”,Jeff Dean 也说“(每个 Google 的服务器进程)Export HTML-based status pages for easy diagnosis”。
内部实现
Channel 是 selectable IO channel,负责注册与响应 IO 事件,它不拥有 file descriptor。它是 Acceptor、Connector、EventLoop、TimerQueue、TcpConnection 的成员,生命期由后者控制。
Socket 封装一个 file descriptor,并在析构时关闭 fd。它是 Acceptor、TcpConnection 的成员,生命期由后者控制。EventLoop、TimerQueue 也拥有 fd,但是不封装为 Socket。
SocketsOps 封装各种 sockets 系统调用。
EventLoop 封装事件循环,也是事件分派的中心。它用 eventfd(2) 来异步唤醒,这有别于传统的用一对 pipe(2) 的办法。它用 TimerQueue 作为计时器管理,用 Poller 作为 IO Multiplexing。
Poller 是 PollPoller 和 EPollPoller 的基类,采用“电平触发”的语意。它是 EventLoop 的成员,生命期由后者控制。
PollPoller 和 EPollPoller 封装 poll(2) 和 epoll(4) 两种 IO Multiplexing 后端。Poll 的存在价值是便于调试,因为 poll(2) 调用是上下文无关的,用 strace 很容易知道库的行为是否正确。
Connector 用于发起 TCP 连接,它是 TcpClient 的成员,生命期由后者控制。
Acceptor 用于接受 TCP 连接,它是 TcpServer 的成员,生命期由后者控制。
TimerQueue 用 timerfd 实现定时,这有别于传统的设置 poll/epoll_wait 的等待时长的办法。为了简单起见,目前用链表来管理 Timer,如果有必要可改为优先队列,这样复杂度可从 O(n) 降为O(ln n) (某些操作甚至是 O(1))。它是 EventLoop 的成员,生命期由后者控制。
EventLoopThreadPool 用于创建 IO 线程池,也就是说把 TcpConnection 分派到一组运行 EventLoop 的线程上。它是 TcpServer 的成员,生命期由后者控制。
类图

线程模型
Muduo 的线程模型符合我主张的 one loop per thread + thread pool 模型。每个线程最多有一个 EventLoop。每个 TcpConnection 必须归某个 EventLoop 管理,所有的 IO 会转移到这个线程,换句话说一个 file descriptor 只能由一个线程读写。TcpConnection 所在的线程由其所属的 EventLoop 决定,这样我们可以很方便地把不同的 TCP 连接放到不同的线程去,也可以把一些 TCP 连接放到一个线程里。TcpConnection 和 EventLoop 是线程安全的,可以跨线程调用。TcpServer 直接支持多线程,它有两种模式:

1. 单线程,accept 与 TcpConnection 用同一个线程做 IO。

2. 多线程,accept 与 EventLoop 在同一个线程,另外创建一个 EventLoopThreadPool,新到的连接会按 round-robin 方式分配到线程池中。

结语
Muduo 是我对常见网络编程任务的总结,用它我能很容易地编写多线程的 TCP 服务器和客户端。Muduo 是我业余时间的作品,代码估计还有很多 bug,功能也不完善(例如不支持 signal 处理),待日后慢慢改进吧。

后记:性能测试:

1. muduo 与 boost asio 吞吐量对比

2. muduo 与 libevent2 吞吐量对比

3. 击鼓传花:对比 muduo 与 libevent2 的事件处理效率


本文来自CSDN博客,转载请标明出处:http://blog.csdn.net/Solstice/archive/2010/08/29/5848547.aspx

2015-10-18 18:18:00 weixin_30824599 阅读数 44

一、简介

Muduo(木铎)是基于 Reactor 模式的网络库。

二、安装

从github库下载源码安装:https://github.com/chenshuo/muduo

muduo依赖了很多的库,所以在安装muduo之前应该安装这些库,包括:curl、c-ares、protobuf。

前面两个在centos6.2可以使用yum安装,protobuf只能源码安装了,参见这篇文章。

yum install libcurl-devel
yum install c-ares-devel

在muduo目录下执行:./build.sh -j2

中间遇到了一些问题:

在net/protorpc/RpcChannel.cc文件中NewCallback未定义,以及examples/protobuf/rpc/client.cc文件中NewCallback未定义,最后在github上提了个issue给硕哥,经过他的指导注释了一下CMakfiles的内容,然后编译通过。

https://github.com/chenshuo/muduo/blob/master/examples/CMakeLists.txt#L39

https://github.com/chenshuo/muduo/blob/master/muduo/net/CMakeLists.txt#L65

三、使用

然后运行test例子:

转载于:https://www.cnblogs.com/lit10050528/p/4887282.html