精华内容
下载资源
问答
  • 基于C++11重写muduo库

    千次阅读 2021-05-28 17:41:44
    文章目录预备知识muduo库简介muduo库的优点muduo和libeventone loop per process言归正传muduo的模型设计Reactor模型muduo库的Reactor模型one (evnet)loop per threadC++11重写muduo核心类代码muduo库的核心类代码...

    邮箱:2877944240@qq.com

    源码:基于C++11重写muduo库

    如有转载,请标明出处,谢谢!

    预备知识

    muduo库简介

    ​ muduo库是一个高质量的基于Reactor模型的开源网络库,采用one loop per thread + thread pool 的架构来实现的,只支持linux系统;

    ​ 这里附上作者陈硕大牛的介绍发布一个基于 Reactor 模式的 C++ 网络库

    muduo库的优点

    既然是one loop per thread就从两方面比,one loop 和 per thread

    muduo和libevent

    笔者libevent只是简单用过,没有去深入的刨析源代码,以下如有不对,请不吝指出!

    libevent使用c写的一个网络库也是基于Reactor模型,它没有用多个loop的这个思想,整体就是一个大的Reactor,自然也没有分发链接的过程,libevent主线程和io线程中间使用socketpair通信的;那换一句话说,就是muduo的one loop、一个线程一个loop有啥优缺点

    思考一下可以大体知道,当sockfd上有事件发生(可能是新用户链接也可能是读写事件),所有的线程都会竞争这个事件的执行权,但是最后只有一个线程执行这个操作,显得其他线程无事可做倒有点浪费资源;

    而muduo库有baseLoop去执行新用户的链接,其他ioLoop只负责固定连接的读写事件,可以并发执行多个连接的读写事件,充分利用CPU

    陈硕大牛是这样说的:

    multi-reactors可以处理突发IO(利用多线程处理多个连接上的IO)

    总结下来就是:计算量大,IO带宽小,去用单个eventloop。如果IO带宽大,计算量小,则用多个eventloop。对延时敏感也是用多个eventloop。

    由于本文重点刨析muduo源码,这些知识等笔者再去深入研究后再隔篇重讲!

    one loop per process

    one loop per process就是一个进程一个事件循环,这个问题归根结底就是讨论多进程和多线程的区别

    虽然线程比进程消耗小,但是不一定比进程快

    总结下来就是:进程间的切换消耗是大于线程的,但是一个进程阻塞了不影响另一个线程,同一个进程的一个线程阻塞了则会影响其他线程;

    举个例子:两段公路A和B,车子在A上面跑虽然耗油,但是没红绿灯,跑得快;

    ​ 而车子在B上面跑虽然省油,但是红绿灯多,跑的相对而言较慢;

    两者没有最好的,只有用在不同场景能体现其对应的价值!

    言归正传

    muduo的模型设计

    Reactor模型

    首先得明白reactor和proactor模型的区别

    reactor:用于同步IO,同步IO就是说,当内核缓冲区有数据时,IO函数被唤醒,IO函数一执行,开始从内 ​ 核缓冲区向用户缓冲区拷贝数据,所耗费的时间都记在程序用时;

    proactor:用于异步IO,异步IO就是说用户把sockfd、buf、和通知方式(不止信号还有回调)通过IO接口
    ​ 传给内核,当内核缓冲区有数据时,IO函数被唤醒,IO函数一执行内核缓冲区就有数据;内核会
    ​ 把数据拷贝到用户提供的buf中,并按照用户提供的通知方式应用程序,这个拷贝的过程,全程都
    ​ 是内核完成的,不计入应用程序耗时;

    Reactor是由**Event(事件)、Reactor(反应堆)、Demultiplex(事件分发器)和Evanthandler(事件处理器)**四部分构成的
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7TipYZCu-1622194885083)(C:\Users\86155\AppData\Roaming\Typora\typora-user-images\1622117752496.png)]

    Event:注册事件和其对应的处理方法给ReactorReactor里边有sockfd以及其对应的event

    Reactor:向其Demultiplex添加 ( 此处以add为例,其他方法也一样 ),启动反应堆

    Demultiplex:开启事件循环,将发生事件的event返回给Reactor

    EventHandlerReactor调用event对应的事件处理器EventHandler执行相应的回调函数

    muduo库的Reactor模型

    • channel封装了Event
    • EventLoop就是一个Reactor
    • Poller就是一个Demultipex
    • channel里边有对应的回调函数[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-W66QXO1A-1622194885091)(C:\Users\86155\AppData\Roaming\Typora\typora-user-images\1622122115122.png)]
    • EvnetLoop在一个单独的进程里边执行(one (event)loop per thread)
    • 一个EventLoop里边有一个Poller和多个Channel,其中Pollerchannel不能相互调用,必须通过EventLoop来进行相关数据的传输
    • channel需要调用EventLoop的方法,对channelMap里边的channel进行增删改查
    • Poller监听sockfd,并把返回的有事件发生的fd对应的channel添加到activechannel里边
    • EventLoop通知channel执行对应的回调

    one (evnet)loop per thread

    在muduo库里边有两种线程:一种里边的事件循环专门处理新用户连接,一种里边的事件循环专门处理对应连接的所有读写事件

    mainLoop( 也就是baseLoop): 负责处理新用户的连接

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-oh5M0F25-1622194885116)(C:\Users\86155\AppData\Roaming\Typora\typora-user-images\1622124907054.png)]

    • Acceptor主要封装了listenfd( 对应的新用户连接事件 )的相关操作,将listenfd封装成了acceptChannel,并注册到其Poller中,Poller监听到事件发生后,mainLoop通知acceptChannel执行相关回调(此回调是Acceptor传给的)

    • 执行Acceptor::handleread时,先进行一个accpte操作,再执行一个回调,newConnectioncallbackTcpServer传给的,轮询选择一个subloop

    • connfd封装成channel分发给subloop,此loop负责此connfd上的所有操作;

      ioLoop:负责处理连接上的各种操作;包括建立连接、断开连接以及发送数据、接收数据

    ​ muduo库底层实现了一个buffer,负责数据的存储,用于接收和发送

    C++11重写muduo核心类代码

    muduo库的核心类代码

    辅助类

    Noncopyable

    此类主要做的就是通过下面两行代码,让其派生类不能进行拷贝构造和赋值,让代码更易读

          noncopyable(const noncopyable&) = delete;
          //让基类不能进行拷贝构造,从而使派生类也不能进行拷贝构造
          noncopyable& operator=(const noncopyable&) = delete;
          //让基类不能进行赋值运算,从而使派生类也不能进行赋值运算
    

    Timestamp

    此类主要为muduo库的一些log提供当前时间

         Timestamp();
         explicit Timestamp(int64_t microSecondsSinceEpoch);
         static Timestamp now();
         //获取当前的时间;直接return Timestamp(time(nullptr))
         std::string toString() const;
    

    Logger

    此类为一些操作提供相应日志信息,方便进行错误的追踪,采用的是饿汉模式(线程安全)

        INFO,  //普通信息
        ERROR, //错误信息
        FATAL, //core信息
        DEBUG, //调试信息
    

    INFO:打印一些重要的流程信息

    ERROR:不影响程序继续执行的错误信息

    FATAL:影响程序继续执行的致命信息

    DEBUG:调试信息

    一共有四种级别的信息,每个函数都有其对应的宏函数LOG_INFO、LOG_ERROR、LOG_FATAL、LOG_DEBUG

    #define [对应的宏](logmsgFormat, ...) \
        do \
        { \
            Logger &logger = Logger::instance();  \
            logger.setLogLevel([信息类型]);   \
            char buf[1024] = {0}; \
            snprintf(buf,1024,logmsgFormat,##__VA_ARGS__); \
            logger.log(buf); \
        } while(0) 
    

    socket

    该类主要封装了sockfd对应的操作

        int fd() { return sockfd_; }
        void bindAddress(const InetAddress &localaddr);
        void listen();
        int accept(InetAddress *peeraddr);
    

    InetAddress

    该类主要封装了socket的地址信息,可以得到ip地址和端口号

        explicit InetAddress(uint16_t port = 0,std::string ip = "127.0.0.1");
        explicit InetAddress(const sockaddr_in &addr)
            :addr_(addr)
            {}
        std::string toIp() const;   //得到ip
        std::string toIpPort() const;    //得到ip加端口号
        uint16_t toPort() const;   //得到端口号
        
        const sockaddr_in* getSockAddr() const{return &addr_;}
        void setSockaddr(const sockaddr_in &addr) { addr_ = addr;}
    

    Buffer

    muduo库底层的发送、接收的消息都是存放在Buffer缓冲区的

        static const size_t kCheapPrepend = 8;   //用来初始化首部信息
        static const size_t kInitialSize = 1024;   //用来初始化数据区
    
    //构造函数
    explicit Buffer(size_t initialSize = kInitialSize)     //initialSize是数据区大小·
                 :buffer_(kCheapPrepend + kInitialSize)    //缓冲区的大小等于首部加数据区的大小
                 ,readerIndex_(kCheapPrepend)              //可读区域的首地址下标
                 ,writerIndex_(kCheapPrepend)              //可写区域的首地址下标
                 {}
    

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8iIwk3yl-1622194885121)(C:\Users\86155\AppData\Roaming\Typora\typora-user-images\1622183670567.png)]
    各种关系大体都是围绕这几个变量计算的

        //可读的大小
        size_t readableBytes() const 
        {
            return writerIndex_ - readerIndex_;
        }
    
        //可写的大小
        size_t writableBytes() const
        {
            return buffer_.size() - writerIndex_; 
        }
    

    CurrentThread

    该类通过系统调用获取当前线程的tid

        extern __thread int t_cachedTid;
    
        void cacheTid()
        {
           if (t_cachedTid == 0)
           {  
              //通过系统调用获取了当前线程的tid值
              t_cachedTid = static_cast<pid_t>(syscall(SYS_gettid)); 
           }
        }
    
        inline int tid()
        {
            //如果线程tid等于0就调用这个(通过系统调用获取线程tid的函数)来获取线程的tid
            //如果线程tid不等于0就返回它自己的;
            if (__builtin_expect(t_cachedTid == 0,0))
            {
               cacheTid();
            }
            return t_cachedTid;
        }
    

    TcpServer以及其相关类

    TcpServer

        void newConnection(int sockfd, const InetAddress &peerAddr);  //增加新连接
        void removeConnection(const TcpConnectionPtr &conn);  //删除链接
        void removeConnectionInloop(const TcpConnectionPtr &conn); //在一个loop中删除该链接
     
        using ConnectionMap = std::unordered_map<std::string, TcpConnectionPtr>; //连接的名字和对应的连接
    
    
        EventLoop *loop_;     //默认的loop,即baseloop
        const std::string ipPort_;   //ip和端口号
        const std::string name_;    //名字
    
        std::unique_ptr<Acceptor> acceptor_;  //专门负责处理新用户的链接
        std::unique_ptr<EventLoopThreadPool> threadPool_;   //one loop per thread;
    
        ConnectionCallback connectionCallback_;  //有新链接的回调
        MessageCallback messageCallback_;      //有读写事件的回调
        WriteCompleteCallback writeCompleteCallback_; //消息发送完成以后的回调
    
        ThreadInitCallback threadInitCallback_; //loop线程初始化的回调
    
        std::atomic_int started_;
    
        int nextConnId_;
        ConnectionMap connections_; //保存所有的链接
    
        TcpServer(EventLoop *loop,     //默认的loop,即Baseloop  
                  const InetAddress &listenAddr,    //地址信息
                  const std::string &nameArg,    //名字
                  Option option = kNoReusePort);   //端口是否重用
    

    构造函数,主要传的参数就是前三个

        void setThreadInitcallback(const ThreadInitCallback &cb){ threadInitCallback_ = cb; }
    //线程初始化回调函数
        void setConnectionCallback(const ConnectionCallback &cb){ connectionCallback_ = cb; }  //链接事件回调函数
        void setMessageCallback(const MessageCallback &cb) { messageCallback_ = cb; }
    //读写事件回调函数
        void setWriteCompleteCallback(const WriteCompleteCallback &cb)          {writeCompleteCallback_ = cb; }
    

    其中新用户链接的回调会传给Accptor

    ​ 读写事件的回调会传给TcpConnection

    TcpServer主要的做的事就是

    • setThreadNum
    void TcpServer::setThreadNum(int numThreads) 
      //设置底层EventLoop的数量,就是ioLoop的数量,若数量为零,那connfd的读写事件也由baseLoop执行
    {
       threadPool_->setThreadNum(numThreads);
    }
    
    • start
    void TcpServer::start() //开启服务监听
    {
       if(started_++ ==0)  //防止一个TcpServer对象被start多次
       {
           threadPool_->start(threadInitCallback_);    //启动底层线程池
           loop_->runInloop(std::bind(&Acceptor::listen,acceptor_.get()));
           //启动acceptor所在的loop,并且acceptor开始监听新用户的链接
       }
    }
    

    还有就是它负责实现Acceptor和TcpConnection所要调用回调

    Acceptor

    此类主要处理新用户的链接,并且链接得到connfd,这个connfd里边的读写事件由TcpConnection执行
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2li3E0CK-1622194885124)(file:///C:\Users\86155\Documents\Tencent Files\2877944240\Image\C2C\33{GQ}R02~3QTGW$@Z_S`KO.png)]
    还是这个图,Acceptor是在baseLoop里边执行的

    void handleRead();
    EventLoop *loop_;   //Accptor用的就是用户定义的那个baseLoop,也称mainLoop
    Socket acceptSocket_;  //它负责监听的就是acceptSocket_上的事件
    Channel acceptChannel_;   //将acceptSocket_封装成acceptChannel_
    NewConnectionCallback newConnectionCallback_;   //当loop_里的poller发现acceptSocket_上有事件发生时,向loop_返回acceptChannel_,Acceptor执行此回调
    bool listenning_;
    

    acceptor的核心就是handleRead

    void Acceptor::handleRead()
    {
       InetAddress peerAddr;
       int connfd = acceptSocket_.accept(&peerAddr);
       if(connfd >= 0)
       {
           if(newConnectionCallback_)
           {
               newConnectionCallback_(connfd,peerAddr);
           }
           else
           {
               ::close(connfd);
           }
       }
       else{
           LOG_ERROR("%s:%s:%d accept err:%d \n",__FILE__,__FUNCTION__,__LINE__, errno);
           if(errno == EMFILE)
           {
              LOG_ERROR("%s:%s:%d reached limit! \n",__FILE__,__FUNCTION__,__LINE__, errno);
           }
       }
    
    }
    

    当有新用户链接时执行这个回调,先accpet得到connfd,再执行newConnectioncallback(通过轮询的方式将connfd分发给ioLoop),当然这个回调都是传的TcpServer的方法,是在TcpServer里边实现的;

    TcpConnection

    此类主要是负责connfd的各种操作,因此回调比较多,都是TcpServer传给的

        ConnectionCallback connectionCallback_;  //有新链接的回调
        MessageCallback messageCallback_;      //有读写事件的回调
        WriteCompleteCallback writeCompleteCallback_; //消息发送完成以后的回调
        HighWaterMarkCallback highWaterMarkCallback_; //高水位线
        CloseCallback closeCallback_;
    
        EventLoop *loop_;  //这里的loop绝对不是baseloop,因为TcpConnection都是在subloop里边管理的
        const std::string name_;  //Tcpconnection的名字
        std::atomic_int state_;   //链接的状态
        
        enum StateE {kDisconnected //已经断开的
                    ,kConnecting  //正在链接
                    ,kConnected  //已经连接上了
                    ,kDisconnecting};  //正在断开 
        bool reading_;
    

    由于Buffer是在发送、接收数据时用到的,因此Buffer是在TcpConnection里面用到的

    EventLoop以及其相关类

    Channel

    此类负责打包sockfd(acceptfd和connfd),不同fd对应的loop提醒channel执行的回调也是不同的

    acceptfd的回调是传的Acceptor的,对应的connfd的回调是传的TcpConnection的

    它的主要成员变量如下

       EventLoop *loop_;  //事件循环(所属的事件循环)
       const int fd_;   //fd,就是poller监听的对象
       int events_;  //注册fd感兴趣的事件
       int revents_;  //发生的事件
       int index_;  //表示的就是channel的状态(未添加、已添加和已删除)
       //新创建的channel默认值为-1,通过updatechannel和removechannel的一个操作改变其对应的值
    
       void update();
       void remove();
       void handleEventWithGuard(Timestamp receiveTime);  //handle(处理) Event(事件) WithGuard(受保护的)
    

    还有对Poller里边map表里边的channel_的管理是通过EventLoop对Poller执行的

    因此这几个方法底层直接调用的EventLoop的对应方法

    channel就主要是封装了fd和其对应的回调

       //因为Channel通道里边能够获知fd最终发生的具体的事件revent,所以它负责调用具体事件的回调操作;
       ReadEventCallback readCallback_;
       EventCallback writeCallback_;
       EventCallback closeCallback_;
       EventCallback errorCallback_;
    

    Poller

    此类是一个抽象接口类,向下提供不同的IO复用函数的接口

        //给所有的IO复用保留统一的接口,需要它们去重写的;
        //当前激活的channel,运行的channel,需要Poller去照顾的channel都在ChannelList里边;
        virtual Timestamp poll(int timeoutMs,ChannelList *activeChannels) = 0;
        virtual void updateChannel(Channel *channel) = 0;
        virtual void removeChannel(Channel *channel) = 0;
    
        using ChannelMap = std::unordered_map<int,Channel*>;  
        //poller监听的channel是EventLoop(EventLoop里边有ChannelList)里边保存的channel
    

    EpollPoller

    此类是用EPollio复用实现的Poller,主要管理和监听channel,并将sockfd有事件发生的channel返回给EventLoop,由EventLoop通知channel执行相应的回调

        //还负责重写基类的方法
        static const int kInitEventListSize = 16;  //保存事件的vector初始化大小
        
        //填写活跃的channel在activeChannel(ChannelList)里边
        void fillActiveChannels(int numEvents, ChannelList *activeChannel) const;
        //更新channel通道
        void update(int operation, Channel *channel);
    
        using EventList = std::vector<epoll_event>;  //事件集合
    
        int epollfd_;
        EventList events_; 
    

    EventLoop

    此类相当于一个Reactor的角色,向Poller的map表里边注册fd事件对应的channel,并接受有事件发生的channel,并通知channel执行相应的回调

        void handleRead();  //wakeup  通过给wakeupfd上写入数据,唤醒ioLoop
        void doPendingFunctors();  //执行回调
    
        using ChannelList = std::vector<Channel*>;   
        std::atomic_bool looping_;  //原子操作,底层通过CAS实现的
        std::atomic_bool quit_;  //标志退出loop循环
    
        const pid_t threadId_;  //记录当前loop所在线程id
        Timestamp pollReturnTime_;  //poller返回发生事件的channels的时间点
        std::unique_ptr<Poller> poller_;
    
        int wakeupFd_; //主要作用,当mainloop获取一个新用户的channel,通过轮训算法选择一个subloop,通过该成员唤醒subloop处理
        std::unique_ptr<Channel> wakeupChannel_;
    
        ChannelList activeChannels_;
        Channel *currentActiveChannel_;
    
        std::atomic_bool callingPendingFunctors_;  //标识当前loop是否需要执行的回调操作
        std::vector<Functor> pendingFunctors_;  //存储loop需要执行的所有的回调操作
        std::mutex mutex_; //互斥锁,用来保护上面vector容器的线程安全操作
    
    

    至此muduo库重要的组件以及每个EventLoop实现的功能已经很清楚了,现在需要考虑的就是在多线程下,实现one loop per thread

    EventLoopThread

    此方法是让一个loop在单独创建的线程中执行

        EventLoop *loop_;   //对应的loop
        bool exiting_;      //线程的状态
        Thread thread_;     //对应的线程
        std::mutex mutex_;   
        std::condition_variable cond_; 
        ThreadInitCallback callback_;
    
       
    EventLoop* EventLoopThread::startLoop()
    {
       thread_.start();  //loop开启时,底层的线程开启了,回调函数threadFunc()就开启了
    
       EventLoop *loop = nullptr;
       {
          std::unique_lock<std::mutex> lock(mutex_);      
          while( loop_ == nullptr )
          {
             cond_.wait(lock);
          }
          loop = loop_;
       } 
       return loop;
    }
    
    //此方法是在单独的线程中执行的
    void EventLoopThread::threadFunc()
    {
       EventLoop loop;  //创建一个独立的eventloop,和上面的线程一一对应
    
       if(callback_)
       {
           callback_(&loop);
       }
    
       {
           std::unique_lock<std::mutex> lock(mutex_);
           loop_ = &loop;
           cond_.notify_one();
       }
    
       loop.loop();  //loop会一直执行,到下一步说明loop已经结束循环了;
       
       std::unique_lock<std::mutex> lock(mutex_);
       loop_ = nullptr;
    }
    

    通过条件变量可以保证创建的线程和运行的loop一一对应

    EventLoopThreadPool

    还记得刚开始TcpServer设置了ThreadNum,在这个类中会根据设置的数量开启相应的线程,每个线程都和一个ioLoop一一对应,并且阻塞在recv等待被唤醒;

        EventLoop *baseLoop_;
        std::string name_;
        bool started_;
        int numThreads_;
        int next_;     //做下一个loop的下表用的;
        std::vector<std::unique_ptr<EventLoopThread>> threads_;  //包含了Loop线程
        std::vector<EventLoop*> loops_;  //包含了所有loop的指针; (还记得调用EventLoopThread里边的startLoop()就能得到loop指针)
    
        //TcpServer的两个方法底层就是调用者俩个方法就是调用的
        void setThreadNum(int numThreads) { numThreads_ = numThreads;}     
        void start(const ThreadInitCallback &cb = ThreadInitCallback())
        {
        started_ = true;
        for(int i=0; i<numThreads_; i++)
        {
           char buf[name_.size() + 32];
           snprintf(buf, sizeof buf, "%s%d", name_.c_str(),i);
           EventLoopThread *t = new EventLoopThread(cb,buf);
           threads_.push_back(std::unique_ptr<EventLoopThread>(t));  //根据开启的线程数开启相应的线程,
           loops_.push_back(t->startLoop());  //执行startLoop函数,返回loop指针,(没执行threadFunc灰调函数的话,线程会阻塞)
                               //底层创建线程,绑定一个新的EventLoop,并返回该loop的地址
        }
        if(numThreads_ == 0 && cb)
        {
            cb(baseLoop_);       //cb(ThreadInitCallback)(用户提前设置的回调)
        }
        }
    

    在Acceptor的HandRead回调函数中,得到connfd,然后通过轮询的方法得到下一个loop,底层也是调用的Pool里边的GetNextLoop()方法;

    
    //如果工作在多线程中,baseLoop_默认以轮训的方式纷飞channel给subloop
    EventLoop* EventLoopThreadPool::GetNextLoop()
    {
        EventLoop *loop = baseLoop_;
        if(!loops_.empty())
        {
           loop = loops_[next_];
           next_++;
           if(next_ >= loops_.size())
           {
            next_ = 0;
           }
        }
        return loop;
    }
    

    总结

    核心代码到这里就基本拉下序幕了,我们可以再总结一下

    • muduo库采用one loop per thread + thread pool的模型;

    • 总体是基于Reactor模型的(EventLoop就是一个Reactor、Poller就是一个事件分发器);

      sockfd以及其对应的回调操作被打包成channel,由EventLoop将其添加到Poller的map表里边,Poller将发生事件的channel返回在EventLoop的List表里边,EventLoop通知channel执行相应的回调操作;

    • muduo库的TcpServer负责传入用户写的回调操作;

      此时fd分为两种p(acceptfd和connfd),分别和其对应的事件打包成channel,acceptfd感兴趣的事件是新用户的链接事件,因此acceptChannel被添加到baseLoop的Poller里边;

      当有新用户链接时,执行acceptfd对应的回调操作,并得到connfd,此时将connfd和其感兴趣的事件添加到ioLoop的Poller里边,当对应的事件发生时,调用相关的回调操作!

    参考文献:施磊《腾讯课堂》

    展开全文
  • 什么是muduo库 Muduo 是基于 Reactor 模式、线程安全的、支持多核多线程的简单易用的网络库。符合现代C++编程规范、大量使用boost的开源网络库。 为什么学习muduo库 muduo库对学习linux下面向过程C++编程、多...

    什么是muduo库

     

    Muduo 是基于 Reactor 模式、线程安全的、支持多核多线程的简单易用的网络库。符合现代C++编程规范、大量使用boost的开源网络库。

    为什么学习muduo库

    muduo库对学习linux下面向过程C++编程、多线程编程、boost库应用实践,有非常大的帮助。通过分析muduo的源代码,深入理解服务器端多线程网络开发实现。此外,muduo库提供了丰富的示例程序,非常有助于使用者深入学习非阻塞网络编程。

    muduo库概况

    代码结构:

    其中,5个关键类:Buffer、EventLoop、TcpConnection、TcpClient、TcpServer。

    类图

    对muduo库有一个基本认识后,后面将会从易到难逐个文件去了解muduo库。

                                                                                                                      立此flag

    展开全文
  • 模拟muduo库实现nonnon-blocking + IO-multiplexing + loop线程模型的高并发 TCP 服务器模型。 开发环境 Centos7 技术栈 C++、多线程、socket网络编程、epoll多路转接 项目设计 整体采用non-blocking + IO-...

    项目简介

    模拟muduo库实现nonnon-blocking + IO-multiplexing + loop线程模型的高并发 TCP 服务器模型。

    开发环境

    Centos7

    技术栈

    C++、多线程、socket网络编程、epoll多路转接

    项目设计

    整体采用non-blocking + IO-multiplexing + loop线程的设计框架,其中线程模型采用one loop per thread的多线程服务端网络编程模型,结合reactor模型进行实现。

    整体框架

    在这里插入图片描述

    Reactor模型

    重要组件:
    Event事件、Reactor反应堆、Dumultiplex事件分发器、EventHandler事件处理器。

    在这里插入图片描述
    首先服务器将用户的所关心的Event事件以及事件发生后的Handler处理函数打包注册到Reactor反应堆上,由Reactor向Demultiplex事件分发器的Epoll添加、修改、删除Event,并且启动Reactor反应堆,开启事件循环;当有事件发生时,Demultiplex就会通知Reactor,此时Reactor就会调用服务器事先注册的Event所对应的EventHandler去处理事件。

    组件设计

    logger组件

    logger组件主要是用于日志信息的封装,给用户提供提示信息。当出现编码错误时,日志会提供错误信息方便调试。
    主要包含以下模块
    LOG_INFO:提示信息。
    LOG_ERROR:错误信息(不影响程序正常执行)。
    LOG_FATAL:致命错误信息(导致程序崩溃)。
    LOG_DUBUG:调试信息

    channel组件

    channel可以理解为通道,对应到reactor即为Event。它封装了用于网络通信的sockfd。sockfd所对应的用户感兴趣的事件events,以及存储发生事件的revents。主要有以下功能:

    1.更新epoll中用户所关心的事件
    2.采用bind和functional函数对象机制提供了设置用户关系时间发生后的回调处理函数。
    3.当有用户感兴趣的事件发生时,根据发生时间的不同,调用用户所设定的回调函数,处理事件。

    也就是说,channel其实就是将Event以及Handler打包后的产物,用sockfd进行标识,同时提供了一些函数接口,便于后续操作。

    Poller & EpollPoller组件

    Poller组件

    Poller组件是多路事件分发器的核心。在muduo库中,poller组件其实就是对poll和epoll的一个抽象,在实现上采用多态机制。为poll和epoll提供的抽象类,poll和epoll通过继承poller并重写poller所提供的纯虚函数接口,以实现多路转接模型。它封装了sockfd以及指向sockfd所打包的channel的指针,以sockfd为键,以channel*为值存储到无序关联容器unordered_map中。并且给派生类即EpollPoller提供了可供其重写的纯虚函数。

    EpollPoller组件

    EpollPoller组件通过继承Poller并且重写其提供的纯虚函数接口,实现epoll多路转接模型。EpollPoller其实就是将epoll的接口封装成了一个类,提供向epoll中添加、修改、删除所关心的事件的接口以及开启事件监听的函数接口,并且对外还提供了向用户返回发生事件的接口以及更新channel的接口。
    Poller和EpollPoller对应的reactor模型中即就是Demultiplex事件分发器。

    EventLoop组件

    Eventloop组件即为事件循环,对应的模型中的Reactor反应堆,主要的功能就是:
    1.开启事件循环
    2.对EpollPoller以及channel进行操作。channel调用EventLoop提供的函数接口将用户所感兴趣的事件注册到epoll上,还可以通过Eventloop对用户所感兴趣的事件,在epoll上进行添加、修改、删除等操作,当有事件发生时EpollPoller就会将所发生事件通过参数反馈给EventLoop,EventLoop再调用用户预先在channel中设置的回调函数EventHandler对发生事件进行处理。
    3.如果当前所发生的事件是运行在其他线程上的loop的所监听的事件,就需要唤醒阻塞epoll_wait上的线程,执行相应的EventHandler操作。通过系统调用接口eventfd创建一个wakeupFd,并打包成channel注册到所在线程的EpollPoller中,如果调用回调的loop所属线程不是当前线程,那么就向这个loop所属的wakeupFd对应的事件中写入数据,用以唤醒loop所在的线程执行相应的回调。

    Thread & EventLoopThread & EventLoopThreadPool

    Thread组件

    Thread组件顾名思义就是线程,所以其实现的功能主要也都是与线程相关的:

    1.创建线程并且执行线程入口函数。
    2.设置线程等待。

    EventLoopThread组件

    EventLoopThread事件循环线程,是对Thread组件的一层封装,主要功能就是提供启动底层的线程(调用Thread所提供的创建线程的方法)并且为将要创建的线程设置入口函数。
    EventLoopThreadPool组件为事件循环线程池,主要功能是:

    1.设置线程数量threadnum。
    2.创建用户所设置的threadnum个线程,并且将每个线程上所创建的loop的地址返回。
    通过轮询算法选择一个loop,将新连接用户所关心的channel分发给loop。

    Socket & Acceptor组件

    Socket组件

    Socket组件是对TCP网络编程的一个抽象,将TCP网络编程的函数接口抽象成了Socket类,主要的功能就是:

    创建套接字。
    绑定地址信息。
    监听新连接的到来。
    获取新连接用于网络通信的套接字socket。

    Acceptor组件

    Acceptor组件是在Socket组件的基础上进行了封装,主要功能:
    1.创建用于监听和获取新连接的accpetSocket,为其绑定地址信息。
    2.将accpetSocket以及其感兴趣的事件(新连接到来)打包成accpetChannel。
    3.开启监听新连接到来并将accpetChannel通过EventLoop注册到所属的epoll中去。
    其实acceptorChannel所属的loop就是mainLoop。

    buffer组件

    buffer组件是网络库底层的数据缓冲区,主要解决的问题就是当TCP接收缓冲区较小但是用户需发送数据过多或者TCP发送数据过多但是我们只需要读取其中一部分数据,未发送的数据或者未读取的数据的存储问题。它主要实现的功能就是:

    1.将未发送的数据或者未读取的数据存储下来。
    2.对缓冲区进行扩容。
    3.向调用发返回读、写缓冲区的大小。
    其模型为:
    在这里插入图片描述

    TcpConnection组件

    TcpConnection组件主要做于新用户连接后的一系列回调操作。一个连接成功的客户端对应一个TcpConnection,其主要封装了连接成功的用户的socket,Channel,当有读写事件发生时,对发送缓冲区和接收缓冲区进行操作,并且为当前连接的用户设置各种的回调操作。

    TcpServer组件

    TcpServer组件是所有组件的总调度者,供用户创建TcpServer对象,使用TCP服务器的,它对外提供了设置建立新连接回调函数、读写消息回调函数、开启事件循环、设置线程数量并创建线程等函数接口,对内提供并绑定了有新用户连接时的回调函数,通过轮询算法分发新连接用户到EventLoopThread上(即subLoop)。并且设置了关闭连接的回调函数。它主要封装了Acceptor以及EventLoop ThreadPool。

    项目各模块交互流程梳理

    用户使用muduo库编写服务器程序的流程

    编写TcpServer类:在构造函数中绑定连接建立和断开的回调函数以及可读写事件的回调函数,设置线程个数(一般和CPU核心数相同),对外start()提供启动服务的接口,实现连接建立和断开的回调函数以及可读写事件的回调函数。
    编程main函数:创建事件循环对象,提供InetAddress地址信息,创建TcpServer服务器对象,启动服务,开启事件循环。

    TCP服务器建立

    用户建立TCP服务器,在创建TcpServer对象时,其构造函数会创建Acceptor对象以及EventLoopThreadPool对象,并且通过Acceptor的setNewConnectionCallback方法为Acceptor对象中所封装的acceptChannel绑定有新连接到来时的回调函数。
    其中Acceptor对象的构造函数会做以下三件事:

    1.创建用于监听新用户到来的监听套接字listenfd,绑定地址信息。
    2.将listenfd和监听新用户连接事件打包成acceptChannel。
    3.设置事件发生后的回调函数。

    当用户调用start()方法启动服务后,EventLoopThreadPool对象就会调用自己的start方法创建用户所设置的threadnum个线程,并且启动所创建的线程。Acceptor对象调用自己的listen方法做以下两件事:

    1.调用Sockets所封装的listen方法开始监听新连接的到来;
    2.调用acceptChannel的enableReading方法,将listenfd所关心的事件注册到EpollPoller上。

    用户调用所创建的事件循环对象的loop方法,开启事件循环。在loop方法中,会通过EventLoop所封装的poller对象调用EpollPoller的poll方法,即调用epoll_wait开始监听注册的事件。此时服务器的服务就启动完成了。

    新用户的连接

    当有新用户连接时,当前loop对象底层的EpollPoller就会监听到acceptChannel的读事件发生,进而调用构造Acceptor对象时绑定的回调函数,在回调函数中,会做以下两件事:

    1.调用Acceptor所封装的acceptSocket_的accept函数获取新连接用于通信的套接字sockfd以及地址信息。
    2.调用构造TcpServer时绑定的处理新连接的回调函数,并将新连接用于通信的套接字以及地址信息传给该回调函数。

    处理新连接的回调函数所做事情:

    1.根据轮询算法选择一个subLoop。
    2.获取sockfd对应的地址信息。
    3.创建TcpConnection对象,把选择的subLoop绑定到TcpConnection所属线程上,将用户设置的连接建立和断开的回调函数以及可读写事件等回调函数通过TcpConnection对象所提供的方法设置给当前TcpConnection连接,并且设置服务器内部关闭连接的回调函数。
    4.执行TcpConnection对象建立连接的回调函数(唤醒TcpConnection所属线程,建立连接回调函数)。在建立连接回调函数中,向EpollPoller中注册当前sockfd所打包channel的读写事件,并且执行用户设置的连接建立和断开的回调函数。此时,连接就建立成功了。

    数据通信

    当用户有读事件发生时,底层的EpollPoller就会将发生的事件上报给EventLoop,EventLoop此时就会调用事先给channel所绑定的读事件的回调函数进行处理。而这个回调函数是用户通过TcpServer的setMessageCallback方法设置的,TcpServer在处理新连接的回调函数中构造TcpConnection对象时,通过TcpConnection所提供的setReadCallback将其绑定到TcpConnection对象所对应的channel中的。所以最终相应到了用户所设置的可读写事件的回调函数上。

    关闭连接

    当连接断开时,底层的EpollPoller就会将发生的事件上报给EventLoop,EventLoop此时就会调用事先给channel所绑定的关闭连接事件的回调函数进行处理。TcpConnection所做的事情就是删除该连接以及将所对应的sockfd以及channel从Poller的无序管理容器中删除,并且将注册在EpollPoller上的事件全部删除。
    在这里插入图片描述

    展开全文
  • 五个简单TCP协议2.muduo库网络模型使用示例 1.五个简单TCP协议 MuduoManual.pdf P50 2.muduo库网络模型使用示例 sudoku求解服务器MuduoManual.pdf P35 (1)reactor(一个IO线程) (2)multiple reactor (多个...

    1.五个简单TCP协议

    • MuduoManual.pdf P50

    • (1)echo - 回显服务,把收到的数据发回客户端

    • eg测试:42\jmuduo\examples\simple\echo\echo.h
      42\jmuduo\examples\simple\echo\main.cc
      42\jmuduo\CMakeLists.txt

    • 测试:客户端使用nc
      在这里插入图片描述
      服务端
      在这里插入图片描述
      客户端退出后,服务端一个TcpConnection销毁了
      在这里插入图片描述

    • (2)discard - 服务器端收到客户端发送过来的数据,丢弃所有收到的数据;

    • eg:42\jmuduo\examples\simple\discard\discard.cc

    • (3)daytime - 服务端 accept 连接之后,以字符串形式发送当前时间,然后主动断开连接;

    • eg测试:42\jmuduo\examples\simple\daytime\daytime.cc

    • 测试:
      在这里插入图片描述

    • (4) time - 服务端 accept 连接之后,以二进制形式发送当前时间(从 Epoch 到现在的秒数),然后主动断开连接;我们需要一个客户程序来把收到的时间转换为字符串。

    • eg测试:

    • 42\jmuduo\examples\simple\time\time.cc
      42\jmuduo\examples\simple\timeclient\timeclient.cc

    • 测试:
      服务端
      在这里插入图片描述
      客户端
      在这里插入图片描述
      在这里插入图片描述

    • (5)chargen(字符串生成协议,ASCII:33-126为可打印字符) - 服务端 accept 连接之后,不停地发送测试数据。

    • 大致规律如下:
      每一行都是72个字符,总共94行,下面是一组消息(即一个消息),每94行为一个单位
      33 34 35 。。。104
      34 35 36 。。。105
      。。。
      55 56 57 。。。126
      56 57 58 。。。126 33
      。。。
      126 33 34 35 .0。。 103

    • eg:42\jmuduo\examples\simple\chargen\chargen.cc
      42\jmuduo\examples\simple\chargenclient\chargenclient.cc

    • eg测试:42\jmuduo\examples\simple\chargen\main.cc

    • 测试:
      客户端使用nc
      在这里插入图片描述
      服务端
      在这里插入图片描述
      吞吐量不是很大,因为TCP有流量调节的功能,即使服务端生成数据很快,但是客户端接收或者处理数据慢,则服务端就会调整它生成数据的速度,以更慢的频率发送数据。
      在这里插入图片描述

    • eg测试:42\jmuduo\examples\simple\chargenclient\chargenclient.cc
      42\jmuduo\examples\simple\chargen\main.cc

    • 测试:客户端
      在这里插入图片描述
      服务端的吞吐量就很高了,基本上把千M网卡跑满了,1000M/8=125Mb
      在这里插入图片描述

    2.muduo库网络模型使用示例

    • sudoku求解服务器MuduoManual.pdf P35
      (1)reactor(一个IO线程)
      (2)multiple reactor (多个IO线程)
      (3)one loop per thread + thread pool (多个IO线程 + 计算线程池)

    • eg:(1)reactor(一个IO线程)
      42\jmuduo\examples\sudoku\server_basic.cc

    • eg:(2)multiple reactor (多个IO线程)
      42\jmuduo\examples\sudoku\server_multiloop.cc
      42\jmuduo\muduo\net\TcpServer.cc

    • eg:(3)one loop per thread + thread pool (多个IO线程 + 计算线程池)
      42\jmuduo\examples\sudoku\server_threadpool.cc
      42\jmuduo\muduo\net\TcpServer.cc
      当前只有一个reactor,当reactor涉及到大量的计算,就应该将计算放到计算线程池ThreadPool来处理,处理完毕后,结果通过IO线程send发送给客户端。
      在这里插入图片描述

    57:48

    • 总结:使用muduo库需要关注三个半事件:
      连接建立;
      连接断开;消息到达;
      消息发送完毕(对于低流量的服务来说,通常不需要关注该事件)
      (1)reactor(一个IO线程)
      提供一个XXXXServer类,在该类中包含一个TcpServer对象;
      OnConnection:包含连接建立和连接断开
      OnMessage:消息到达
      OnWriteComplete:消息发送完毕
      (2)要实现multiple reactor 模式(IO线程有多个),此时只需要增加server_setThreadNum(numThreads);,又进而调用EventLoopThreadPool::setThreadNum
      (3)若增加计算线程池来计算计算密集型的服务,则在编写的XXX服务器中,增加一个ThreadPool threadPool_;//计算线程池对象,threadPool_.start(numThreads_);启动若干线程,threadPool_.run(boost::bind(&solve, conn, puzzle, id));来处理计算任务,通过conn->send(result+"\r\n");IO线程来发送计算的结果
    展开全文
  • 重写muduo库

    2020-07-19 20:02:44
    模拟 muduo 实现non-blocking + IO-multiplexing + loop线程模型的高并发 TCP 服务器模型。 2. 开发环境: CentOS7 linux环境。 3. 技术栈: C++、多线程、socket网络编程、epoll多路转接。 4. 项目设计: 整体...
  • muduo库学习下载链接

    2021-03-08 22:09:37
    muduo库学习下载链接 github地址:https://github.com/chenshuo/muduo 使用muduo库帮助地址:https://github.com/chenshuo/muduo-tutorial muduo源码压缩包:https://pan.baidu.com/s/1miQNh1n2gNZNyd95p3dMUw 提取...
  • muduo库的使用

    2020-01-15 17:05:38
    muduo库的使用 参考博客: 1.https://blog.csdn.net/YoungSusie/article/details/90035042 2.https://blog.csdn.net/weixin_42405632/article/details/88057204 makefile文件主要修改了include和lib的查找路径,并...
  • muduo库学习篇-学习目的和计划

    千次阅读 2019-05-22 18:45:39
    从今天开始大概打算花一两个月的时间把陈硕大神的muduo库进行学习,此次学习muduo库主要有下面几个目的: 通过学习muduo库里面的各个基础库,强化自己对c++编程的理解和运用 通过学习muduo网络库加强自己对linux...
  • 东阳的学习笔记 1. 内容总结 前面几篇文章的内容归纳如下: 线程同步的四项原则,尽量使用高层同步设施(线程池、队列、倒计时) 使用普通互斥器和条件变量完成剩余的同步任务,采用 RAII惯用手法和 ...muduo库.
  • Muduo库中的Buffer设计

    2020-05-18 13:13:23
    Muduo库中的Buffer类设计 非阻塞网络编程中应用层buffer是必须的 原因:非阻塞IO的核心思想是避免阻塞在read()或write()或其他IO系统调用上,这样可以最大限度复用thread-of-control,让一个线程能服务于多个socket...
  • Linux下安装muduo库

    2020-03-28 12:29:56
    Linux下安装muduo库下载使用安装包创建文件夹将下载好的安装包传入服务器检查传输是否成功解压文件使用CMake编译构建程序 下载使用安装包 创建文件夹 将下载好的安装包传入服务器 检查传输是否成功 解压文件 ...
  • 文章目录1.muduo库对编写tcp客户端程序的支持 1.muduo库对编写tcp客户端程序的支持 Connector // 主动发起连接 TcpClient // 包含了一个Connector对象 测试程序 Reactor_test11.cc // echo server TcpClient_test....
  • QT链接muduo库

    2016-10-20 19:36:07
    muduo编译安装链接... 使用muduo库中自带的例子DiscardServer,路径examples/simple/DiscardServer; 代码直接拷贝即可。 在PRO文件中添加以下数据: LIBS += /usr/lib/muduo/libmudu
  • Muduo is a multithreaded C++ network library based on the reactor pattern. http://github.com/chenshuo/muduo Copyright (c) 2010, Shuo Chen. All rights reserved. Use of this source code is governed ...
  • muduo库整个项目使用cmake编译,所以需要先安装cmake。 使用yum安装,安装前确保配置了可用的yum源。 安装完cmake,我们便可以来编译muduo库了。 在muduo/路径下,直接执行./build.sh 从报错信息来看,我们...
  • 我相信用linux进行后端开发的C++程序员应该对muduo库都比较熟悉,muduo库利用了epoll+线程池对网络IO模块进行了封装,实现了高并发、高性能的网络库,通过简单的几个类能快速的建立起网络模块,非常的方便。...
  • 1、在介绍Logger类时,我们提到Logger类可以自定义Output xx函数和Flush xx函数,由用户来选择日志信息的输出位置以及flush方式,例如在muduo库学习笔记十三:base库之日志库一 Logger类示例程序中,我们就使用...
  • muduo库的EpollPoller剖析

    2018-02-14 11:38:07
    EpollPoller,是muduo库对I/O复用机制epoll的封装,不过默认使用的是EpollPoller。在EventLoop中初始化构造poller_,调用newDefaultPoller(this),构造默认的poller。EventLoop::EventLoop() : looping_(false), //...
  • Condition类:是muduo库中对系统线程条件变量类函数进行的封装;往往跟mutexlock配合使用, 但也不控制其对象的生存期。 类图: 条件变量使用规范为: wait函数主要工作为: 整个condition类主要为方便...
  • muduo库Thread类剖析

    千次阅读 2016-10-24 22:49:13
    muduo库中的Thread类集合了所有线程的操作,其中还运用了线程安全的观察者模式。 运用shared_ptr和weak_ptr做到了生命周期的管理。 代码加注释如下: Thread.h #ifndef MUDUO_BASE_THREAD_H #define MUDUO_BASE_...
  • Mutex互斥量:muduo库中的封装为MutexLock类与MutexLockGuard类。 类图: 继承boost::noncopyable,对象语义不能拷贝,该类主要是对一系列系统线程函数pthread_mutex_XXX的封装。 私有数据成员: mutex_:存储...
  • centos下Muduo库的安装

    千次阅读 2017-06-15 15:46:18
    1.下载muduo库,在作者陈硕github我们可以下载到mduo最新安装包。git clone https://github.com/chenshuo/muduo2.在安装muduo之前我们需要先安装Boost库和cmake工具,muduo有三个非必要依赖库,curl,c-areas DNS,...
  • muduo库源码阅读

    2016-07-07 19:53:12
    这两个月来状态一般,把leetcode刷完了,然后忙于阅读muduo库的源码,大概花了两个星期,彻底搞明白了整个网络库的构架与运行的回调过程,其中所有的是操作都是基于时间的,并且使用了timerfd来注册定时器。...
  • muduo库学习笔记(-)

    2018-03-13 14:54:57
    muduo库学习笔记 前言 code 注:个人水平有限,不能保证此文完全正确,如果错误,请不吝指出。 此文在观看《大并发服务器开发》视频时所做。 因此本文所讲基于muduo库0.9.1-beta版本,需要依赖cmake及...
  • EventLoopThreadPool也是派生自noncopyable, 其用于创建IO线程池,用于把...EventLoop我们应该很熟悉,也就是整个muduo库的核心,就是Reactor模式当中的Dispatcher(事件分发器),具体至EventLoop章节进行详细分析。
  • muduo库的模块还时比较多的,正确的梳理模块的交互,我觉得是看源码的第一步。
  • muduo库是基于boost开发的,所以需要先在Linux平台上安装boost库。 boost库的安装 Boost::asio还没有正式的成为C++标准库,因此如果使用Boost::asio进行网络I/O编程,需要先在当前系统平台上(linux)安装boost库...
  • Thread类:muduo库中的线程封装类。 类图: 私有成员变量: started:表示线程是否启动 pthreadId:Linux下的POSIX线程也有一个id,类型pthread_t,由pthread_self()取得,该id由线程库维护,其id空间是各个...
  • muduo库手册

    2014-11-18 11:45:51
    感觉不错,分享一下,它的使用手册,希望它对你有帮助
  • muduo库编译

    2019-03-01 14:10:57
    执行如下两条命令 ./build.sh -j2 ./build.sh install 在我编译echo例子时提示如下错误: “muduo/net/TcpServer.h: 没有那个文件或目录”,而这个目录... 所以我就自己将muduo目录链接到 /usr/include 下;另外...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 5,706
精华内容 2,282
关键字:

muduo库