精华内容
下载资源
问答
  • c++ 多线程 线程池服务器程序 和 客户端程序 ,这是一个已经测试过的例子,编译环境为vs2013,有兴趣的,可以下载运行学习
  • C++多线程以及线程池

    2020-06-10 16:11:46
    在Unix System V及SunOS中也被称为轻量进程(lightweight processes),但轻量进程更指内核线程(kernel thread),而把用户线程(user thread)称为线程。   同一进程中的线程将共享该进程中的全部系统资源

    1 线程

    1.1 简介

      线程(英语:thread)是操作系统能够进行运算调度的最小单位。大部分情况下,它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。在Unix System V及SunOS中也被称为轻量进程(lightweight processes),但轻量进程更多指内核线程(kernel thread),而把用户线程(user thread)称为线程。
      同一进程中的多条线程将共享该进程中的全部系统资源,如虚拟地址空间,文件描述符和信号处理等等。但同一进程中的多个线程有各自的调用栈(call stack),自己的寄存器环境(register context),自己的线程本地存储(thread-local storage)。一个进程可以有很多线程,每条线程并行执行不同的任务。在多核或多CPU,或支持Hyper-threading的CPU上使用多线程程序设计的好处是显而易见的,即提高了程序的执行吞吐率。在单CPU单核的计算机上,使用多线程技术,也可以把进程中负责I/O处理、人机交互而常被阻塞的部分与密集计算的部分分开来执行,编写专门的workhorse线程执行密集计算,从而提高了程序的执行效率。

    1.2 线程的调度

      线程作为CPU调度最基本的单位其状态和进程基本相同:
    在这里插入图片描述

    • 新建状态:当线程被创建到开始执行任务之间的状态,一般对应的是线程对象被创建到执行start结构之间的过程;
    • 就绪状态:线程开始执行之后,只是表明线程准备好执行,需要CPU调度进行执行;
    • 运行状态:线程被CPU翻牌子,开始执行任务;
    • 阻塞状态:线程因为等待其他事件触发或者等待资源而发生阻塞,一般分为:
      • 等待阻塞:线程执行了wait
      • 同步阻塞:因为同步锁获取失败,而进入阻塞;
      • 其他阻塞:因等待获取IO资源而阻塞或者线程调用了sleep
    • 死亡状态:线程任务执行完成,可以被销毁。

      线程调度算法其实是CPU调度算法,一般有:

    1. 先来先服务(FCFS)算法;
    2. 时间片轮转调度算法;
    3. 短作业优先算法;
    4. 最短剩余时间优先算法;
    5. 高响应比优先算法;
    6. 优先级调度算法;
    7. 多级反馈队列调度算法;

    1.3 线程和进程的区别

    1. 线程是任务调度和执行的基本单位;进程是资源分配的基本单位;
    2. 单个线程可以包含多个线程,单个线程至少包含一个线程,不存在不在进程内的线程,也没有不存在线程的进程;
    3. 因为虚拟内存的关系不同的进程互相不知道对方的存在,相对独立,互相有独立的内存空间,地址空间;相同进程内的进程之间之间共享进程的内存空间,文件描述符表,部分寄存器等资源,但是每个线程都有自己独立的线程栈和部分寄存器,线程间访问资源需要考虑互斥问题;不同进程内的线程关系和不同进程关系相似,相对独立;
    4. 进程创建,销毁,切换代价大;线程创建,销毁,切换小。

      进程和线程的优缺点:

    • 进程有利于资源管理和保护,开销大;进程不利于资源管理和保护,需要使用锁保证线程的安全运行,开销小。

    1.4 线程的通信方式

      不同进程间的线程通信等同于进程通信,一般是五种通信方式:

    1. 消息队列;
    2. 共享内存;
    3. 有名管道/无名管道;
    4. 信号;
    5. scoket。

      相同进程中的不同线程因为资源共享不存在通信问题,主要是资源的互斥访问,一般需要对资源进行加锁以防死锁。

    1.5 内核线程和用户线程

      线程的实现方式分为内核线程和用户线程。
      内核级线程:由系统内核创建,撤销,调度的线程。
      用户级线程:由用户进程自行调度的线程。
      混合级线程:同时支持用户级线程和内核级线程。
      内核级线程和用户级线程的区别

    1. 内核线程需要操作系统支持,系统可知;用户级线程,系统调度资源分配是面向进程的,并不可知;
    2. 内核级线程创建、调度和撤销类似于进程;用户级线程创建和调度由用户进程控制,需要用户程序控制线程的调度工作;
    3. 内核级线程不同线程之间相对独立;用户线程中如果一个线程因其他原因而阻塞则导致整个进程阻塞,相同进程中的其他线程也被阻塞;
    4. 内核级线程资源竞争空间为全局;用户级线程资源竞争空间为当前进程。

      内核级线程优缺点

    • 优点:
      1. 多核CPU友好,能够实现整整意义上的多线程;
      2. 如果单个进程中的一个线程被阻塞,进程中的其他线程仍然能够进程切换运行;
      3. 所有阻塞线程的调用都以系统调用实现,代价较小;
    • 缺点:
      1. 由内核进行调度,用户的可定制性差;
      2. 线程在用户态运行,而线程的切换和调度相关操作在内核实现,进行线程切换时代价相对较高。

      用户级线程的优缺点

    • 优点:
      1. 线程调度由用户控制,不需要内核参与,控制灵活,相对可控;
      2. 不需要内核支持,可以在不支持多线程的系统中实现;
      3. 线程创建、销毁、调度和切换等管理操作的代价比内核级线程小;
      4. 线程能够利用的表空间和堆栈空间比内核线程多;
    • 缺点:
      1. 资源分配和调度按照进程进行,单个进程中同一时间只有一个线程运行,多处理机不友好;
      2. 当进程中的一个线程因缺页中断等原因阻塞时整个进程都会阻塞。

    2 多线程

      多线程(英语:multithreading),是指从软件或者硬件上实现多个线程并发执行的技术。具有多线程能力的计算机因有硬件支持而能够在同一时间执行多于一个线程,进而提升整体处理性能。具有这种能力的系统包括对称多处理机、多核心处理器以及芯片级多处理(Chip-level multithreading)或同时多线程(Simultaneous multithreading)处理器。
      多线程一般分为软件多线程和硬件多线程,分别通过软件实现和硬件支持。软件支持的多线程无法实现整整意义上的并发,硬件支持的多线程能够实现整整意义上的并发。

    2.1 多线程模型

      多线程模型分为:多对一,一对一,多对多。
    在这里插入图片描述

      多对一模型:
      多个用户级线程映射到一个内核级线程。用户级线程对系统不可见,创建,调度,撤销的成本低效率高;当单个进程中的一个用户线程发生阻塞会导致进程阻塞。
      一对一模型:
      每一个用户进程映射到一个内核线程。并发能力强,不同线程之间相对独立;创建,调度,撤销需要调用系统调用,相对开销比较大。
      多对多模型:
      多个用户级线程映射到少许(<<用户线程数)内核线程。是对多对一和一对一模型的折中。

    2.2 硬件层面的多线程支持

      多线程的支持分为用户层,系统层(内核层),硬件层。其中只有硬件层的支持才能完全实现整整意义上的并发(同一时间多条指令运行),其他都只是感官上的多线程,实际上每个时刻只有一个线程在运行。
      硬件层面的多线程实现分为:粗粒度交替多线程,细粒度交替多线程和同步多线程。
      下图只是为了让读者更好的理解不同方式的区别,其中绿色和红色的框是随机画的并不完全准确。
    在这里插入图片描述

    2.2.1 粗粒度交替多线程

      粗粒度交替多线程中一个线程会一直运行到被一个外部事件导致无法继续运行为止才进行线程切换。该外部事件可以是:IO阻塞,程序分支,时间片结束等等高时延事件。
      硬件成本:为了高效的线程切换,寄存器需要有两个实例。
      许多微控制器与嵌入式处理器有多重的寄存器列,就能够在中断时快速环境切换。这样架构可以视为程序的线程与中断线程之间的块状多线程处理。

    2.2.2 细粒度交替多线程

      将CPU的周期轮转切换至不同的线程。更像是在时间片轮转的基础上再进行一次分割调度处理。
      硬件成本:除了讨论块状多线程的硬件成本,交错式多线程也因每层管线需要追踪运行中指令的线程代码而增加硬件成本。而且,当越来越多的线程同时在管线中运行,像是缓存与 TLB 等共享资源也要加大来避免不同线程之间的冲突。
      Intel Super-threading,Raza Microelectronics Inc XLR等处理器采用此方式。

    2.2.3 同步多线程

      同时多线程(Simultaneous multithreading,缩写SMT)也称同步多线程,是一种提高具有硬件多线程的超标量CPU整体效率的技术。同时多线程允许多个独立的执行线程更好地利用现代处理器架构提供的资源。

    超标量(superscalar)CPU架构是指在一颗处理器内核中实行了指令级并发的一类并发运算。这种技术能够在相同的CPU主频下实现更高的CPU流量(throughput)。处理器的内核中一般有多个执行单元(或称功能单元),如算术逻辑单元、位移单元、乘法器等等。未实现超标量体系结构时,CPU在每个时钟周期仅执行单条指令,因此仅有一个执行单元在工作,其它执行单元空闲。超标量体系结构的CPU在一个时钟周期可以同时分派(dispatching)多条指令在不同的执行单元中被执行,这就实现了指令级的并行。超标量体系结构可以视作多指令流多数据流。

      每个CPU周期,单个线程都会发布多个指令。
      硬件成本:交错式多线程如果不计硬件成本,SMT在每个管线层次结构的追踪线程指令会有多余的花费。而且,像是缓存与TLB这类共享的资源可能会因为多出来的线程而变得更大。
      Inter超线程,IBM Power5,AMD Bulldozer等都采用这种方式。

    2.3 Thread API

    2.3.1 C++11 API

      2011年8月12日,国际标准化组织(ISO)发布了第三个C++标准,即ISO/IEC 14882:2011,简称ISO C++ 11标准。该标准第一次把线程的概念引入C++标准库。Windows平台运行的VS2012和Linux平台运行的g++4.7,都完美支持C++11线程。

      标准库文档见:std::thread

    //构造函数
    thread() noexcept;                                  //构造不表示线程的thread对象
    thread( thread&& other ) noexcept;                  //移动拷贝构造函数,调用之后other不再是可执行线程
    template< class Function, class... Args >
    explicit thread( Function&& f, Args&&... args );    //构造可执行线程,f为需要线程执行的操作,args为可变参数,为f的参数
    thread(const thread&) = delete;                     //禁用thread的拷贝构造
    //析构函数
    ~thread()                                           //析构前需要调用join()和detach()
    
    //观察器
    bool joinable() const noexcept;                     //检查 std::thread 对象是否标识活跃的执行线程,若 thread 对象标识活跃的执行线程则为 true ,否则为 false
    std::thread::id get_id() const noexcept;            //返回线程的 id
    native_handle_type native_handle();                 //返回底层实现定义的线程句柄
    
    static unsigned int hardware_concurrency() noexcept;//返回支持的并发线程数。若该值非良定义或不可计算,则返回0,应该只把该值当做提示。
    
    
    //操作
    void join();                                        //阻塞当前线程直至 *this 所标识的线程结束其执行
    void detach();                                      //从 thread 对象分离执行线程,允许执行独立地持续。一旦该线程退出,则释放任何分配的资源。调用 detach 后 *this 不再占有任何线程
    void swap( std::thread& other ) noexcept;           //交换二个 thread 对象的底层柄
    

      示例:

    #include <thread>
    #include <iostream>
    #include <atomic>
    
    void func1(int n)
    {
        for (int i = 0; i < 5; i++)
        {
            std::cout << "$thread " << std::this_thread::get_id() << " " << n << std::endl;
            n++;
            std::this_thread::sleep_for(std::chrono::microseconds(1000));
        }
    }
    
    void func2(int &n)
    {
        for (int i = 0; i < 5; i++)
        {
            std::cout << "#thread " << std::this_thread::get_id() << " " << n << std::endl;
            n++;
            std::this_thread::sleep_for(std::chrono::microseconds(100));
        }
    }
    
    int thread_test()
    {
        int n = 0;
        n = 10;
        std::thread thread_rst; //仅仅是一个对象,不是一个线程
        std::thread thread_snd(func1, n);
        std::thread thread_thd(func2, std::ref(n));
    
        std::cout << thread_snd.get_id() << " joinable:" << thread_snd.joinable() << std::endl;
        std::cout << thread_thd.get_id() << " joinable:" << thread_thd.joinable() << std::endl;
    
        std::cout << thread_snd.get_id() << " native handle :" << thread_snd.native_handle() << std::endl;
        std::cout << thread_thd.get_id() << " native handle :" << thread_thd.native_handle() << std::endl;
        if (thread_snd.joinable())
            thread_snd.join();
    
        if (thread_thd.joinable())
            thread_thd.join();
    
        std::cout << thread_snd.get_id() << " joinable:" << thread_snd.joinable() << std::endl;
        std::cout << thread_thd.get_id() << " joinable:" << thread_thd.joinable() << std::endl;
    
        std::cout << "final value is " << n << std::endl;
        return 0;
    }
    

      输出:

    $thread 140737336633088 10
    #thread 140737328240384 10
    140737336633088 joinable:1
    140737328240384 joinable:1
    140737336633088 native handle :140737336633088
    140737328240384 native handle :140737328240384
    #thread 140737328240384 11
    #thread 140737328240384 12
    #thread 140737328240384 13
    #thread 140737328240384 14
    $thread 140737336633088 11
    $thread 140737336633088 12
    $thread 140737336633088 13
    $thread 140737336633088 14
    thread::id of a non-executing thread joinable:0
    thread::id of a non-executing thread joinable:0
    final value is 15
    

    2.3.2 C11 API

    //#include <thread.h>
    int thrd_create(thrd_t *thr, thrd_start_t func, void *arg);     //创建线程
    _Noreturn void thrd_exit( int res );                            //结束线程
    int thrd_join(thrd_t thr, int *res);                            //等待线程运行完毕
    thrd_t thrd_current();                                          //返回当前线程的线程标识符
    

    2.3.3 Win API

    //#include <windows.h>
    //创建用户级线程
    HANDLE WINAPI CreateThread(LPSECURITY_ATTRIBUTES lpThreadAttributes, SIZE_T dwStackSize, LPTHREAD_START_ROUTINE lpStartAddress, LPVOID lpParameter, DWORD dwCreationFlags, LPDWORD lpThreadId);
    //结束本线程
    VOID WINAPI ExitThread(DWORD dwExitCode);
    //挂起指定的线程
    DWORD WINAPI SuspendThread( HANDLE hThread );
    //恢复指定线程运行
    DWORD WINAPI ResumeThread(HANDLE hThread);
    //等待线程运行完毕
    DWORD WINAPI WaitForSingleObject(HANDLE hHandle, DWORD dwMilliseconds);
    //返回当前线程的线程标识符
    DWORD WINAPI GetCurrentThreadId(void);
    //返回当前线程的线程句柄
    HANDLE WINAPI GetCurrentThread(void);
    

    2.3.4 Posix API

      详细内容见POSIX线程

    //#include <pthread.h>
    //创建用户级线程
    int pthread_create(pthread_t * thread, const pthread_attr_t * attr, void *(*start_routine)(void *), void *arg);
    //等待用户级线程
    int pthread_join(pthread_t thread, void ** retval);
    //退出用户级线程
    void pthread_exit(void *retval);
    //返回当前用户级线程的线程标识符
    pthread_t pthread_self(void);
    //用户级线程的取消
    int pthread_cancel(pthread_t thread);
    

    3 锁

    锁这一块儿,稍微了解下,感觉还没有完全吃透,还需要更多实践才行。

    3.1 死锁

      死锁:当两个以上的运算单元,双方都在等待对方停止运行,以获取系统资源,但是没有一方提前退出时,就称为死锁。在多任务操作系统中,操作系统为了协调不同行程,能否获取系统资源时,为了让系统运作,必须要解决这个问题。
      死锁的基本条件:

    • 禁止抢占(no preemption):系统资源不能被强制从一个进程中退出;
    • 持有和等待(hold and wait):一个进程可以在等待时持有系统资源;
    • 互斥(mutual exclusion):资源只能同时分配给一个进程或者线程,无法多个进程或者线程共享;
    • 循环等待(circular waiting):一系列进程互相持有其他进程所需要的资源。

      死锁的四个条件缺一不可,一般如果需要解决死锁,破坏其中一个即可。
      死锁理论上可以通过银行家算法进行预测,但是实际上系统资源量,需求量往往并不是能够准确估计的,因此死锁无法准确预测,只能尝试避免。

    3.2 Linux系统中的锁

      锁是为了避免多个运行单位对临界区的竞争访问,资源依赖进行限制,防止出现不可预见的错误,死锁等状况。
      可重入锁:当线程获取某个锁后,还可以继续获取它,可以递归调用,而不会发生死锁。
      不可重入锁:与可重入相反,获取锁后不能重复获取,否则会死锁(自己锁自己)。

    3.2.1 自旋锁

      自旋锁是用于多线程同步的一种锁,线程反复检查锁变量是否可用。由于线程在这一过程中保持执行,因此是一种忙等待。一旦获取了自旋锁,线程会一直保持该锁,直至显式释放自旋锁。自旋锁最多只能被一个可执行线程持有。如果一个执行线程试图获得一个被已经持有(争用)的自旋锁,那么该线程就会一直进行忙循环-旋转-等待锁重新可用要是锁未被争用,请求锁的执行线程就可以立即得到它,继续执行。

    //#include <spinlock.h>
    spin_lock()             //获取指定的自旋锁
    spin_lock_irq()         //禁止本地中断并获取指定的锁
    spin_lock_irqsave()     //保存本地中断当前状态,禁止本地中断,并获取指定的锁
    spin_unlock()           //释放指定的锁
    spin_unlock_irq()       //释放指定的锁,并激活本地中断
    spin_unlock_irqrestore()//释放指定的锁,并让本地中断恢复到以前的状态
    spin_lock_init()        //动态初始化指定的spinlock_t
    spin_trylock()          //试图获取指定的锁,如果未获取,则返回0
    spin_is_locked()        //如果指定的锁当前正在被获取,则返回非0,否则返回0
    

    3.2.2 互斥锁

      互斥锁(Mutual exclusion,缩写 Mutex)是一种用于多线程编程中,防止两条线程同时对同一公共资源(比如全局变量)进行读写的机制。该目的通过将代码切片成一个一个的临界区域(critical section)达成。临界区域指的是一块对公共资源进行访问的代码,并非一种机制或是算法。一个程序、进程、线程可以拥有多个临界区域,但是并不一定会应用互斥锁。

    pthread_mutex_t lock; // 互斥锁定义
    pthread_mutex_init(&lock, NULL); // 动态初始化,	成功返回0,失败返回非0
    pthread_mutex_t thread_mutex = PTHREAD_MUTEX_INITIALIZER; // 静态初始化
    pthread_mutex_lock(&lock); // 阻塞的锁定互斥锁
    pthread_mutex_trylock(&thread_mutex)// 非阻塞的锁定互斥锁,成功获得互斥锁返回0,如果未能获得互斥锁,立即返回一个错误码
    pthread_mutex_unlock(&lock)// 解锁互斥锁
    pthread_mutex_destroy(&lock) // 销毁互斥锁
    

    3.2.3 读写锁

      读写锁是计算机程序的并发控制的一种同步机制,也称“共享-互斥锁”、多读者-单写者锁。多读者锁,push lock用于解决读写问题(readers–writers problem)。读操作可并发重入,写操作是互斥的。读写锁通常用互斥锁、条件变量、信号量实现。

    //#include <pthread.h>
    int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);          //初始化读写锁
    int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);   //销毁读写锁
    int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);    //读加锁
    int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);    //写加锁
    int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);    //解锁
    int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock); //非阻塞获得读锁
    int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock); //非阻塞获得写锁
    

    3.2.4 RCU锁

      RCU锁是对读写锁的一种改进,在性能上相比更加高效。
      RCU(Read-Copy Update),对于被RCU保护的共享数据结构,读者不需要获得任何锁就可以访问它,但写者在访问它时首先拷贝一个副本,然后对副本进行修改,最后使用一个回调(callback)机制在适当的时机把指向原来数据的指针重新指向新的被修改的数据。这个时机就是所有引用该数据的CPU都退出对共享数据的操作。RCU实际上是一种改进的rwlock,读者几乎没有什么同步开销,它不需要锁。(CPU发生了上下文切换称为经历一个quiescent state)

    rcu_read_lock()             //读者在读取由RCU保护的共享数据时使用该函数标记它进入读端临界区
    rcu_read_unlock()           //该函数与rcu_read_lock配对使用,用以标记读者退出读端临界区
    synchronize_rcu()           //该函数由RCU写端调用,它将阻塞写者,直到经过grace period后,即所有的读者已经完成读端临界区,写者才可以继续下一步操作
    synchronize_kernel()        //其他非RCU的内核代码使用该函数来等待所有CPU处在可抢占状态,目前功能等同于synchronize_rcu,但现在已经不建议使用,而使用synchronize_sched
    synchronize_sched()         //该函数用于等待所有CPU都处在可抢占状态,它能保证正在运行的中断处理函数处理完毕,但不能保证正在运行的softirq处理完毕
    void fastcall call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu))
                                //函数 call_rcu 也由 RCU 写端调用,它不会使写者阻塞,因而可以在中断上下文或 softirq 使用,而 synchronize_rcu、synchronize_kernel 和synchronize_shced 只能在进程上下文使用。该函数将把函数 func 挂接到 RCU回调函数链上,然后立即返回。一旦所有的 CPU 都已经完成端临界区操作,该函数将被调用来释放删除的将绝不在被应用的数据
    void fastcall call_rcu_bh(struct rcu_head *head, void (*func)(struct rcu_head *rcu))
                                //函数call_ruc_bh功能几乎与call_rcu完全相同,唯一差别就是它把softirq的完成也当作经历一个quiescent state,因此如果写端使用了该函数,在进程上下文的读端必须使用rcu_read_lock_bh
    #define rcu_dereference(p)     ({ \
                                    typeof(p) _________p1 = p; \
                                    smp_read_barrier_depends(); \
                                    (_________p1); \
                                    })
                                //该宏用于在RCU读端临界区获得一个RCU保护的指针,该指针可以在以后安全地引用,内存栅只在alpha架构上才使用
    static inline void list_add_rcu(struct list_head *new, struct list_head *head)
                                //该函数把链表项new插入到RCU保护的链表head的开头
    static inline void list_add_tail_rcu(struct list_head *new,struct list_head *head)
                                //该函数类似于list_add_rcu,它将把新的链表项new添加到被RCU保护的链表的末尾
    static inline void list_del_rcu(struct list_head *entry)
                                //该函数从RCU保护的链表中移走指定的链表项entry,并且把entry的prev指针设置为LIST_POISON2,但是并没有把entry的next指针设置为LIST_POISON1,因为该指针可能仍然在被读者用于便利该链表
    static inline void list_replace_rcu(struct list_head *old, struct list_head *new)
                                //该函数是RCU新添加的函数,并不存在非RCU版本。它使用新的链表项new取代旧的链表项old,内存栅保证在引用新的链表项之前,它的链接指针的修正对所有读者可见
    list_for_each_rcu(pos, head)//该宏用于遍历由RCU保护的链表head,只要在读端临界区使用该函数,它就可以安全地和其它_rcu链表操作函数
    list_for_each_safe_rcu(pos, n, head)
                                //该宏类似于list_for_each_rcu,但不同之处在于它允许安全地删除当前链表项pos
    list_for_each_entry_rcu(pos, head, member)
                                //该宏类似于list_for_each_rcu,不同之处在于它用于遍历指定类型的数据结构链表,当前链表项pos为一包含struct list_head结构的特定的数据结构
    list_for_each_continue_rcu(pos, head)
                                //该宏用于在退出点之后继续遍历由RCU保护的链表head
    static inline void hlist_del_rcu(struct hlist_node *n)
                                //它从由RCU保护的哈希链表中移走链表项n,并设置n的ppre指针为LIST_POISON2,但并没有设置next为LIST_POISON1,因为该指针可能被读者使用用于遍利链表
    static inline void hlist_add_head_rcu(struct hlist_node *n,struct hlist_head *h)
                                //该函数用于把链表项n插入到被RCU保护的哈希链表的开头,但同时允许读者对该哈希链表的遍历。内存栅确保在引用新链表项之前,它的指针修正对所有读者可见
    hlist_for_each_rcu(pos, head)//该宏用于遍历由RCU保护的哈希链表head,只要在读端临界区使用该函数,它就可以安全地和其它_rcu哈希链表操作函数(如hlist_add_rcu)并发运行
    hlist_for_each_entry_rcu(tpos, pos, head, member)
                                //类似于hlist_for_each_rcu,不同之处在于它用于遍历指定类型的数据结构哈希链表,当前链表项pos为一包含struct list_head结构的特定的数据结构
    

    3.2.5 条件变量

      条件变量是用来等待线程而不是上锁的,通常和互斥锁一起使用。互斥锁的一个明显的特点就是某些业务场景中无法借助系统来唤醒,仍然需要业务代码使用while来判断,这样效率本质上比较低。而条件变量通过允许线程阻塞和等待另一个线程发送信号来弥补互斥锁的不足,所以互斥锁和条件变量通常一起使用,来让条件变量异步唤醒阻塞的线程。

    int pthread_cond_init(pthread_cond_t *cond,pthread_condattr_t *cond_attr);
    int pthread_cond_wait(pthread_cond_t *cond,pthread_mutex_t *mutex);
    int pthread_cond_timewait(pthread_cond_t *cond,pthread_mutex *mutex,const timespec *abstime);
    int pthread_cond_destroy(pthread_cond_t *cond);
    int pthread_cond_signal(pthread_cond_t *cond);
    int pthread_cond_broadcast(pthread_cond_t *cond);  //解除所有线程的阻塞
    

    4 线程池

    4.1 简介

      线程池(thread pool)是一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。
      任务调度以执行线程的常见方法是使用同步队列,称作任务队列。池中的线程等待队列中的任务,并把执行完的任务放入完成队列中。

    4.2 线程池模式

      线程池模式分为:HS/HA半同步/半异步模式、L/F领导者与跟随者模式。

    • 半同步/半异步模式又称为生产者消费者模式,是比较常见的实现方式,比较简单。分为同步层、队列层、异步层三层。同步层的主线程处理工作任务并存入工作队列,工作线程从工作队列取出任务进行处理,如果工作队列为空,则取不到任务的工作线程进入挂起状态。由于线程间有数据通信,因此不适于大数据量交换的场合;
    • 领导者跟随者模式,在线程池中的线程可处在3种状态之一:领导者leader、追随者follower或工作者processor。任何时刻线程池只有一个领导者线程。事件到达时,领导者线程负责消息分离,并从处于追随者线程中选出一个来当继任领导者,然后将自身设置为工作者状态去处置该事件。处理完毕后工作者线程将自身的状态置为追随者。这一模式实现复杂,但避免了线程间交换任务数据,提高了CPU cache相似性。在ACE(Adaptive Communication Environment)中,提供了领导者跟随者模式实现。

    4.3 线程池的优点

    1. 提高资源利用率。可以重复使用已经创建了的线程资源;
    2. 提高响应速度。当线程池中的线程没有超过线程池的最大上限时,有的线程处于等待分配任务状态,当任务到来时,无需创建线程就能被执行;
    3. 高度可控,可管理。线程池会根据当前系统特点对池内的线程进行优化处理,减少创建和销毁线程带来的系统开销。

      线程池的伸缩性对性能有较大的影响:

    • 创建太多线程,将会浪费一定的资源,有些线程未被充分使用;
    • 销毁太多线程,将导致之后浪费时间再次创建它们;
    • 创建线程太慢,将会导致长时间的等待,性能变差;
    • 销毁线程太慢,导致其它线程资源饥饿。

    4.4 简单的线程池实现

    #include <vector>
    #include <mutex>
    #include <thread>
    #include <functional>
    #include <queue>
    #include <atomic>
    #include <condition_variable>
    #include <iostream>
    
    class thread_pool
    {
    private:
        using task = std::function<void()>;
    
        std::atomic_bool _is_running; //显示线程池的状态
        std::mutex _mutex;
        std::condition_variable _cond;
        size_t _thread_no;
        std::vector<std::thread> _threads;
        std::queue<task> _tasks;
    
    public:
        thread_pool(const thread_pool &tp) = delete;
        thread_pool &operator=(const thread_pool &tp) = delete;
    
    public:
        thread_pool(size_t no)
            : _thread_no(no),
              _is_running(false)
        {
        }
    
        ~thread_pool()
        {
            if (_is_running)
                stop();
        }
    
    private:
        void work()
        {
            std::cout << "thread " << std::this_thread::get_id() << " begin to work!" << std::endl;
            while(_is_running)
            {
                task t;
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    if(!_tasks.empty())
                    {
                        t = _tasks.front();
                        _tasks.pop();
                    }
                    else if(_is_running && _tasks.empty())
                    {
                        _cond.wait(lock);
                    }
                }
    
                if(t)
                {
                    t();
                }
            }
    
            std::cout << "thread " << std::this_thread::get_id() << " going to die!" << std::endl;
        }
    
    public:
        void start()
        {
            this->_is_running = true;
            for (size_t i = 0; i < _thread_no; i++)
            {
                _threads.emplace_back(std::thread(&thread_pool::work, this));
            }
        }
    
        void stop()
        {
            {
                std::unique_lock<std::mutex> lock(_mutex);
                _is_running = false;
                _cond.notify_all();
            }
    
            for (std::thread &thd : _threads)
            {
                if (thd.joinable())
                {
                    thd.join();
                }
            }
        }
    
        void append_task(const task &t)
        {
            if (_is_running)
            {
                std::unique_lock<std::mutex> lock(_mutex);
                _tasks.push(t);
                _cond.notify_one();
            }
        }
    };
    
    void func1()
    {
        std::cout << "$thread working in func1 in thread " << std::this_thread::get_id() << std::endl;
    }
    
    void func2(int n)
    {
        std::cout << "#thread working in func2 in thread " << std::this_thread::get_id() << ";n is " << n << std::endl;
    }
    
    void thread_pool_test()
    {
        thread_pool pool(10);
        pool.start();
        for (int i = 0; i < 20; i++)
        {
            pool.append_task(std::bind(func2, i));
            std::this_thread::sleep_for(std::chrono::milliseconds(500));
        }
    
        pool.stop();
    }
    

      输出:

    thread 140737336633088 begin to work!
    thread 140737328240384 begin to work!
    thread 140737319847680 begin to work!
    thread 140737311454976 begin to work!
    thread 140737303062272 begin to work!
    thread 140737219917568 begin to work!
    thread 140737211524864 begin to work!
    thread 140737203132160 begin to work!
    thread 140737194739456 begin to work!
    thread 140737186346752 begin to work!
    #thread working in func2 in thread 140737336633088;n is 0
    #thread working in func2 in thread 140737328240384;n is 1
    #thread working in func2 in thread 140737319847680;n is 2
    #thread working in func2 in thread 140737311454976;n is 3
    #thread working in func2 in thread 140737303062272;n is 4
    #thread working in func2 in thread 140737219917568;n is 5
    #thread working in func2 in thread 140737211524864;n is 6
    #thread working in func2 in thread 140737203132160;n is 7
    #thread working in func2 in thread 140737194739456;n is 8
    #thread working in func2 in thread 140737186346752;n is 9
    #thread working in func2 in thread 140737336633088;n is 10
    #thread working in func2 in thread 140737328240384;n is 11
    #thread working in func2 in thread 140737319847680;n is 12
    #thread working in func2 in thread 140737311454976;n is 13
    #thread working in func2 in thread 140737303062272;n is 14
    #thread working in func2 in thread 140737219917568;n is 15
    #thread working in func2 in thread 140737211524864;n is 16
    #thread working in func2 in thread 140737203132160;n is 17
    #thread working in func2 in thread 140737194739456;n is 18
    #thread working in func2 in thread 140737186346752;n is 19
    thread 140737336633088 going to die!
    thread 140737328240384 going to die!
    thread 140737319847680 going to die!
    thread 140737303062272 going to die!
    thread 140737311454976 going to die!
    thread 140737211524864 going to die!
    thread 140737203132160 going to die!
    thread 140737194739456 going to die!
    thread 140737219917568 going to die!
    thread 140737186346752 going to die!
    

    5 参考

    展开全文
  • c++ 多线程 线程池 类 实现一个线程池,供用户使用里面的现成
  • 利用c++11标准库实现的线程线程池样例程序,可调用任意参数的函数作为线程任务,支持类成员函数的调用。
  • 线程池(C/C++版)

    2015-02-16 11:20:43
    C/C++ threadpool封装, 线程池, Linux, 多线程, pthread
  • 在高效C++编程中看到一个不错的内存池实现方案,与大家共享: (一)单线程内存池代码实现如下: template class CMemoryPool {  public:  enum { EXPANSION_SIZE = 32};  CMemoryPool(unsigned int ...

    在高效C++编程中看到一个不错的内存池实现方案,与大家共享:

    (一)单线程内存池代码实现如下:
    template<typename T>
    class CMemoryPool
    {
        public:
            enum { EXPANSION_SIZE = 32};

           CMemoryPool(unsigned int nItemCount = EXPANSION_SIZE)
            {
               ExpandFreeList(nItemCount);
            }
           
            ~CMemoryPool()
            {
                //free allmemory in the list
               CMemoryPool<T>* pNext = NULL;
                for(pNext =m_pFreeList; pNext != NULL; pNext = m_pFreeList)
                {
                   m_pFreeList = m_pFreeList->m_pFreeList;
                   delete [](char*)pNext;
                }
            }

           void* Alloc(unsigned int /*size*/)
            {
               if(m_pFreeList == NULL)
                {
                   ExpandFreeList();
                }
               
                //get freememory from head
               CMemoryPool<T>* pHead = m_pFreeList;
                m_pFreeList= m_pFreeList->m_pFreeList;
                returnpHead;
            }

           void Free(void* p)
            {
                //push thefree memory back to list
               CMemoryPool<T>* pHead = static_cast<CMemoryPool<T>*>(p);
               pHead->m_pFreeList = m_pFreeList;
                m_pFreeList= pHead;
            }

        protected:
            //allocate memory and push to thelist
            void ExpandFreeList(unsignednItemCount = EXPANSION_SIZE)
            {
                unsigned intnSize = sizeof(T) > sizeof(CMemoryPool<T>*) ? sizeof(T) :sizeof(CMemoryPool<T>*);
               CMemoryPool<T>* pLastItem =static_cast<CMemoryPool<T>*>(static_cast<void*>(newchar[nSize]));
                m_pFreeList= pLastItem;
                for(int i=0;i<nItemCount-1; ++i)
                {
                   pLastItem->m_pFreeList = static_cast<CMemoryPool<T>*>(static_cast<void*>(newchar[nSize]));
                   pLastItem = pLastItem->m_pFreeList;
                }

               pLastItem->m_pFreeList = NULL;
            }

        private:
            CMemoryPool<T>* m_pFreeList;
    };

    它的实现思想就是每次从List的头上取内存,如果取不到则重新分配一定数量; 用完后把内存放回List头部,这样的话效率很高,因为每次List上可以取到的话,肯定是空闲的内存。

    当然上面的代码只是针对单线程的,要支持多线程的话也很简单,外面加一层就可以了
    (二)多线程内存池代码实现如下:
    class CCriticalSection
    {
    public:
        CCriticalSection()
        {
           InitializeCriticalSection(&m_cs);
        }

        ~CCriticalSection()
        {
            DeleteCriticalSection(&m_cs);
        }

        voidLock()
        {
            EnterCriticalSection(&m_cs);
        }

        voidUnlock()
        {
            LeaveCriticalSection(&m_cs);
        }

    protected:
        CRITICAL_SECTION m_cs;
    };

    template<typename POOLTYPE,typename LOCKTYPE>
    class CMTMemoryPool
    {
        public:
            void* Alloc(unsigned int size)
            {
                void* p =NULL;
               m_lock.Lock();
                p =m_pool.Alloc(size);
               m_lock.Unlock();

               return p;
            }

           void Free(void* p)
            {
               m_lock.Lock();
               m_pool.Free(p);
               m_lock.Unlock();   
            }

        private:
            POOLTYPE m_pool;
            LOCKTYPE m_lock;
    };

    //Test code
    #include <iostream>
    #include <windows.h>

    using namespace std;

    #include"MemoryPool.h"
    #include "MTMemoryPool.h"

    class CTest
    {
    public:
        int m_n;
        int m_n1;

        void*operator new(size_t size)
        {
            void* p = s_pool->Alloc(size);
            return p;
        }

        voidoperator delete(void* p, size_t size)
        {
            s_pool->Free(p);
        }

        staticvoid NewPool()
        {
            //s_pool = newCMemoryPool<CTest>;
            s_pool = newCMTMemoryPool<CMemoryPool<CTest>, CCriticalSection>;
        }

        staticvoid DeletePool()
        {
            delete s_pool;
            s_pool = NULL;
        }
       
        //static CMemoryPool<CTest>* s_pool;
        static CMTMemoryPool<CMemoryPool<CTest>,CCriticalSection>* s_pool;
    };

    //CMemoryPool<CTest>*CTest::s_pool = NULL;
    CMTMemoryPool<CMemoryPool<CTest>, CCriticalSection>* CTest::s_pool= NULL;

    void testFun()
    {
        int i;
        const int nLoop = 10;
        const int nCount = 10000;
       
        for(int j = 0; j<nLoop; ++j)
        {
            typedef CTest* LPTest;
            LPTest arData[nCount];
            for(i=0;i <nCount; ++i)
            {
                arData[i] =new CTest;
            }

           for(i=0;i <nCount; ++i)
            {
                deletearData[i];
            }
        }
    }

    int main(int argc, char*argv[])
    {
        {
            unsigned int dwStartTickCount =GetTickCount();

           CTest::NewPool();

           testFun();
           
            CTest::DeletePool();
           
            cout << "total cost"<< GetTickCount() - dwStartTickCount << endl;
        }


        system("pause");

        return 0;
    }

     

    展开全文
  • 学习计划 每天学习一小时以上 跟着视频动手编写代码 调试代码并对比课程多提供的源码 课程目标 理解多线程原理并学会c++11 的多线程编程 理解线程池技术原理并能使用c++实现 理解c++11 14 17 20 多线程编程相关特性 ...
  • 文章目录多线程线程池c++实现1. linux pthread库中对线程的操作1.1 线程的创建和资源回收1.2 线程的互斥和同步2. 生产者-消费者的多线程模型3. 线程池3.1 为什么需要线程池?3.2 线程池需要解决什么技术问题?3.3 ...

    多线程和线程池的c++实现

    1. linux pthread库中对线程的操作

    1.1 线程的创建和资源回收

    每一次调用pthread_create()都会创建一个子线程,如果子线程是joinable,则
    必须显式调用pthread_detach()将其变为non-joinable自行释放资源
    or 显式调用pthread_join()由主线程为其释放资源,否则会造成内存泄露.
    默认创建的线程是joinable的.

    这里的资源到底是啥?

    1. 子线程从父线程拷贝的栈内存,使用pthread_join()由父线程清理或
      pthread_detach()由系统清理,如pthread_create之前父线程中的局部变量
    2. 子线程自己申请的堆内存,使用清理函数pthread_cleanup_push()
      和pthread_cleanup_pop(), 如线程内部malloc或new出的空间
      参见https://www.cnblogs.com/cthon/p/9078042.html, TODO

    线程执行的函数其中的参数,返回值,局部变量在线程执行完毕离开函数后均会自动
    释放,不在这里所说的资源范围内

    如果主线程想要使用子线程的结果,则不能自顾自的直接返回,由2种可选方式:

    1. pthread_join()阻塞主线程直到子线程返回释放子线程的资源
      优点: 主线程阻塞,不占用cpu资源
      缺点: 有些业务情境下不希望主线程阻塞,主线程需要做其他的事情
    2. pthread_detach()由子线程自己结束后自行释放资源,主线程使用while (true)
      死循环持续运行
      优点: 主线程并未阻塞,可以处理其他事情
      缺点: 主线程持续占用cpu资源

    为什么pthread_create()传入的函数为void* func(void* arg)形式?
    实际使用时不同场景传入的参数类型不一致,数目不一致,因此统一使用void*类型,简单的
    单个参数直接转换类型传给形参,复杂的参数先构建结构体,然后传入指向该结构体的指针

    1.2 线程的互斥和同步

    pthread_mutex_lock发生了什么?
    各个线程竞争上锁,竞争成功则进入临界区称为运行状态,竞争失败则进入阻塞状态
    竞争: cpu唤醒所有等待的线程,使他们变为就绪,
    竞争成功: 被cpu调度执行,进入运行状态,开始执行临界区代码
    竞争失败: 阻塞,进入睡眠状态,不占用cpu资源,同时被放入等待资源的优先级队列

    pthread_cond_wait(&cond, &mutex)发生了什么?

    1. 条件不满足,阻塞,进入等待队列并释放锁,一气呵成,原子操作
    2. 条件满足(允许生产,stock<10),从阻塞状态唤醒进入就绪状态,竞争上锁,分步进行,
      非原子操作
      a) 上锁成功,判断stock<10,退出while循环,执行生产动作
      b) 上锁失败,再次判断stock,如果stock<10,则阻塞在mutex上;如果stock>=10,则执行
      条件不满足的动作,阻塞在条件变量上

    虽然都是阻塞,但是阻塞的位置是不同的,阻塞在条件变量上是等待条件满足,为了同步
    阻塞在mutex上是等待其他线程释放锁,为了互斥访问

    pthread_cond_signal(&cond)发生了什么?
    我操作完了,可能会使得消费者等待的条件满足,因此发出信号(条件不一定满足,
    满足不满足需要另一方的消费者自己写代码判断,因为条件是另一方定的)

    2. 生产者-消费者的多线程模型

    生产者-消费者模型,二者共享的资源是有限容量的仓库,仓库空时暂停消费,仓库满时
    暂停生产,对生产和消费二者的同步通过这两个条件变量实现;

    生产-生产,消费-消费,生产-消费都需要互斥访问,因为无论生产还是消费都是写操作,
    使用互斥锁来实现对仓库容量的互斥访问

    3. 线程池

    3.1 为什么需要线程池?

    为了充分利用cpu的计算资源(不让cpu闲下来),使用多线程执行任务;
    但cpu的核心数有限,不可能来一个任务创建一个线程来执行,因为这样的代价是:
    a) 创建的线程数高于cpu的核心数,如果任务的执行时间较长,由于系统的调度机制,一个
    线程还没执行完,另一个线程开始被调度执行,cpu频繁在线程间切换,浪费时间
    b) 来一个任务创建一个线程,创建线程也需要时间
    c) 任务不是一直在高峰状态,任务低谷时间段大量的线程闲置,消耗内存资源

    线程池是执行效率和内存开销之间权衡的产物,预先创建一定数量的线程,放进一个池子里,
    当来了任务之后,放进等待队列中,如果队列不为空,则向线程池发信号.其中的线程竞争
    执行任务,执行完毕后将任务从等待队列中移除

    虽然线程池预先分配了系统内存资源,但

    1. 避免了持续创建新线程的开销
    2. 虽然无法避免cpu在线程间的切换,但这种切换是可控的,因为线程的数目是我们自己定的

    3.2 线程池需要解决什么技术问题?

    1) 如何定义一个任务?

    任务可理解为 函数+输入数据,一个任务提供了他需要处理的数据和处理这些数据的方法
    就像去修车铺修自行车,我需要向修车师傅(线程)提供我的漏气自行车(数据)和我想对
    这个漏气自行车执行的操作(修理漏气自行车),如果需要返回数据,使用输出参数

    因此可定义一个类,数据成员包括
    a) 一个函数
    b) 要处理的数据
    成员方法:
    Run(): 调用函数成员(传入要处理的数据)

    为了方便扩展任务的类型,可定义抽象基类Task,定义Run()接口,子类自行提供任务要
    调用的函数和处理的数据

    2) 如何调度任务的执行?
    把任务放进优先级队列中,线程池根据优先级取出任务执行

    3) 任务队列如何通知线程池中的线程?
    啊!这不刚好使用条件变量吗

    3.3 线程池类应当如何设计?

    1. 数据成员
    • 线程池大小
    • 存放线程的vector
    • 存放任务队列的deque(目前只采用简单的先入先出策略,未使用优先级队列)
    • 静态成员: 互斥锁(执行任务之间的线程互斥,添加任务-执行任务的互斥)
    • 静态成员: 条件变量(任务队列是否为空)
    1. 接口
    • 构造函数调用Create()创建给定数目的线程
    • AddTask() 添加任务并发出信号
    • StopAll() 调用pthread_join回收资源
    1. 方法
    • Create()创建的线程存进vector,创建线程的pthread_create调用
      静态 void* Run(void* arg)方法
    • Run()阻塞等待任务队列非空,执行任务,并将任务从任务队列中删除

    为什么mutex, cond, task_queue都要使用static修饰?

    1. 需要将其放在类内,不能使用全局变量
    2. 类内static修饰的变量与类外的全局变量等效

    原因: 多个线程之间共享数据:
    在一个文件的范围内,多个线程执行同样的函数,该函数访问数据,该数据对不同函数
    是共享的,因此从编程的逻辑来看数据的共享不是执行同一个函数的多个线程对函数内
    定义的变量的共享,而是多个函数的共享,实际上每个线程都有自己的栈空间,栈空间
    存放的是函数内定义的局部变量的拷贝

    命名空间与文件对应,类与函数对应,类中的数据成员相当于函数内的局部变量,多个
    线程执行同一个类的成员函数(当然必须为static函数),共享的数据应当是不同类共享
    的数据,因此需要声明为static,类内成员函数对数据成员的共享是类相比于函数额外
    增加的优势

    展开全文
  • c++ 线程池 即用版

    2018-12-21 15:14:07
    c++ 实现的线程池工程,可直接使用,其中线程数目可根据空闲情况自增减,并且实现线程跟任务无关联。
  • c++11多线程线程池

    2018-12-09 13:56:00
    最近需要开发一个高性能计算库,涉及到c++多线程的应用,上次做类似的事情已经是4年多以前了,印象中还颇有些麻烦。悔当初做了就算了,也没想着留点记录什么的。这次又研究了一番,发现用上c++11特性之后,现在已经...

    最近需要开发一个高性能计算库,涉及到c++多线程的应用,上次做类似的事情已经是4年多以前了,印象中还颇有些麻烦。悔当初做了就算了,也没想着留点记录什么的。这次又研究了一番,发现用上c++11特性之后,现在已经比较简单了,在此记录一下。

     

    最简单的多线程情况,不涉及公共变量,各个线程之间独立运行,主线程只负责传入参数并接收运行结果。这种情况也是多线程性能最好的场景,因为不涉及锁的问题。之前c++标准库对多线程支持并不好,要么用boost线程库,要么用操作系统提供的线程自己轮。自从c++11之后,标准库对多线程支持好多了。下面给出一个简单的多线程示例代码。

    #include <thread>
    #include <future>
    #include <vector>
    #include <iostream>
    using namespace std;
    
    struct result {
    	vector<double> x;
    	double y;
    };
    
    void f1(promise<result> &res)
    {
    	vector<double> vec(2);
    	vec[0] = 1.5;
    	vec[1] = 2.2;
    	res.set_value({ vec, 4.7 });
    }
    
    double f2(double x)
    {
    	return sin(x);
    }
    
    int main(int argc, char* argv[])
    {
    	promise<result> res1;
    	packaged_task<double(double)> res2(f2);
    	future<result> ft1 = res1.get_future();
    	future<double> ft2 = res2.get_future();
    	thread th1(f1, ref(res1));       // 线程参数均为copy,ref代表引用,必须显式给出
    	thread th2(move(res2), 3.14/6);  // 为了提高性能采用move
    
    	ft1.wait();
    	ft2.wait();
    
    	result r1 = ft1.get();
    	double r2 = ft2.get();
    
    	th1.join();  // 这两句对此例不必要,仅为了逻辑自洽给出
    	th2.join();
    
    	cout << "result: x = (" << r1.x[0] << ", " << r1.x[1] << "), y = " << r1.y << endl;
    	cout << "sin(pi/6) = " << r2 << endl;
    	return 0;
    }
    
    

    这段代码分别用了promise和packaged_task两种方式来新建线程并获得返回值。注意新建的线程通过复制参数来实现,因此对于引用的参数要用std::ref来声明,否则无法获得返回值。

     

    对于大规模的计算,通常都把计算划分为小块,然后塞给新线程。小块计算的数量会很多,频繁创建和销毁线程的开销也很大,因此通常的做法是利用线程池。顾名思义,线程池中有若干线程,当有新任务来,就把任务分给线程运行,运行结束后该线程再等候下一个任务。下面给出线程池的实现,

    ThreadPool.hpp

    #pragma once
    #ifndef THREAD_POOL_H
    #define THREAD_POOL_H
    
    #include <vector>
    #include <queue>
    #include <thread>
    #include <atomic>
    #include <condition_variable>
    #include <future>
    
    namespace ThreadPool
    {
    #define MAX_THREAD_NUM 8
    
    	//线程池,可以提交变参函数或lambda表达式的匿名函数执行,可以获取执行返回值
    	//支持类成员函数,支持类静态成员函数或全局函数,Operator()函数等
    	class ThreadPool
    	{
    		typedef std::function<void()> Task;
    	private:
    		std::vector<std::thread> m_pool;     // 线程池
    		std::queue<Task> m_tasks;    // 任务队列
    		std::mutex m_lock;    // 同步锁
    		std::condition_variable m_cv;   // 条件阻塞
    		std::atomic<bool> m_isStoped;    // 是否关闭提交
    		std::atomic<int> m_idleThreadNum;  //空闲线程数量
    	public:
    		ThreadPool(int size = MAX_THREAD_NUM) : m_isStoped(false)
    		{
    			size = size > MAX_THREAD_NUM ? MAX_THREAD_NUM : size;
    			m_idleThreadNum = size;
    			for (int i = 0; i < size; i++)
    			{
    				//初始化线程数量
    				m_pool.emplace_back(&ThreadPool::scheduler, this);
    			}
    		}
    
    		~ThreadPool()
    		{
    			Close();
    			while (!m_tasks.empty()) {
    				m_tasks.pop();
    			}
    			m_cv.notify_all();  // 唤醒所有线程执行
    			for (std::thread& thread : m_pool) {
    				if (thread.joinable()) {
    					thread.join();  // 等待任务结束,前提是线程一定会执行完
    				}
    			}
    			m_pool.clear();
    		}
    
    		// 打开线程池,重启任务提交
    		void ReOpen() {
    			if (m_isStoped) m_isStoped.store(false);
    			m_cv.notify_all();
    		}
    
    		// 关闭线程池,停止提交新任务
    		void Close() {
    			if (!m_isStoped) m_isStoped.store(true);
    		}
    
    		// 判断线程池是否被关闭
    		bool IsClosed() const {
    			return m_isStoped.load();
    		}
    
    		// 获取当前任务队列中的任务数
    		int GetTaskSize() { 
    			return m_tasks.size(); 
    		}
    
    		// 获取当前空闲线程数
    		int IdleCount() { 
    			return m_idleThreadNum; 
    		}
    
    		// 提交任务并执行
    		// 调用方式为 std::future<returnType> var = threadpool.Submit(...)
    		// var.get() 会等待任务执行完,并获取返回值
    		// 其中 ... 可以直接用函数名+函数参数代替,例如 threadpool.Submit(f, 0, 1)
    		// 但如果要调用类成员函数,则最好用如下方式
    		// threadpool.Submit(std::bind(&Class::Func, &classInstance)) 或
    		// threadpool.Submit(std::mem_fn(&Class::Func), &classInstance)
    		template<class F, class... Args>
    		auto Submit(F&& f, Args&&... args)->std::future<decltype(f(args...))>
    		{
    			if (m_isStoped.load()) {
    			  throw std::runtime_error("ThreadPool is closed, can not submit task.");
    			}
    
    			using RetType = decltype(f(args...));  // typename std::result_of<F(Args...)>::type, 函数 f 的返回值类型
    			std::shared_ptr<std::packaged_task<RetType()>> task = std::make_shared<std::packaged_task<RetType()>>(
    				std::bind(std::forward<F>(f), std::forward<Args>(args)...)
    				);
    			std::future<RetType> future = task->get_future();
    			// 封装任务并添加到队列
    			addTask([task](){
    				(*task)(); 
    			});
    
    			return future;
    		}
    	private:
    		// 消费者
    		Task getTask() {
    			std::unique_lock<std::mutex> lock(m_lock); // unique_lock 相比 lock_guard 的好处是:可以随时 unlock() 和 lock()
    			while (m_tasks.empty() && !m_isStoped) {
    				m_cv.wait(lock);
    			}  // wait 直到有 task
    			if (m_isStoped) {
    				return Task();
    			}
    			assert(!m_tasks.empty());
    			Task task = std::move(m_tasks.front()); // 取一个 task
    			m_tasks.pop();
    			m_cv.notify_one();
    			return task;
    		}
    
    		// 生产者
    		void addTask(Task task)
    		{
    			std::lock_guard<std::mutex> lock{ m_lock }; //对当前块的语句加锁, lock_guard 是 mutex 的 stack 封装类,构造的时候 lock(),析构的时候 unlock()
    			m_tasks.push(task);
    			m_cv.notify_one(); // 唤醒一个线程执行
    		}
    
    		// 工作线程主循环函数
    		void scheduler()
    		{
    			while (!m_isStoped.load()) {
    				// 获取一个待执行的 task
    				Task task(getTask());
    				if (task) {
    					m_idleThreadNum--;
    					task();
    					m_idleThreadNum++;
    				}
    			}
    		}
    	};
    }
    
    #endif

     

    该线程池的使用如下,

    ThreadPool.cpp:

    #include "ThreadPool.hpp"
    #include <iostream>
    
    struct gfun {
    	int operator()(int n) {
    		printf("%d  hello, gfun !  %d\n", n, std::this_thread::get_id());
    		return 42;
    	}
    };
    
    class Test
    {
    public:
    	int GetThreadId(std::string a, double b)
    	{
    		std::this_thread::sleep_for(std::chrono::milliseconds(10000));
    		std::thread::id i = std::this_thread::get_id();
    		std::cout << "In Test, thread id: " << i << std::endl;
    		std::cout << "a: " << a.c_str() << ", b = " << b << std::endl;
    		return i.hash();
    	}
    };
    
    int main()
    {
    	ThreadPool::ThreadPool worker{ 4 };
    	Test t;
    	std::cout << "at the beginning: " << std::endl;
    	std::cout << "idle threads: " << worker.IdleCount() << std::endl;
    	std::cout << "tasks: " << worker.GetTaskSize() << std::endl;
    	std::future<int> f1 = worker.Submit(std::bind(&Test::GetThreadId, &t, "123", 456.789));
    
    	std::cout << "after submit 1 task: " << std::endl;
    	std::cout << "idle threads: " << worker.IdleCount() << std::endl;
    	std::cout << "tasks: " << worker.GetTaskSize() << std::endl;
    	std::future<int> f2 = worker.Submit(std::mem_fn(&Test::GetThreadId), &t, "789", 123.456);
    
    	std::cout << "after submit 2 task: " << std::endl;
    	std::cout << "idle threads: " << worker.IdleCount() << std::endl;
    	std::cout << "tasks: " << worker.GetTaskSize() << std::endl;
    	std::future<int> f3 = worker.Submit(gfun{}, 0);
    
    	std::cout << "f1 = " << f1.get() << ", f2 = " << f2.get() << ", f3 = " << f3.get() << std::endl;
    
    	std::cout << "after all task: " << std::endl;
    	std::cout << "idle threads: " << worker.IdleCount() << std::endl;
    	std::cout << "tasks: " << worker.GetTaskSize() << std::endl;
    	return 0;
    }

     

    注意上面的例子中,线程池的线程数是一开始定义好的,此后也不会改变。但实际上,线程池中的线程数可以根据任务的多少动态调节。对于大规模计算的需求而言,cpu核数是固定的,要计算的任务数基本也是固定的,所以没必要这样做;如果是服务器处理网络请求,则可以采用线程数动态调节的方式,增加可扩展性。

     

    完整的vs2013工程包可在如下地址下载

    https://download.csdn.net/download/u014559935/10838079

    转载于:https://my.oschina.net/propagator/blog/2985836

    展开全文
  • C++11 加入了线程库,从此告别了标准库不支持并发的历史。然而 c++ 对于多线程的支持还是比较低级,稍微高级一点的用法都需要自己去实现,譬如线程池、信号量等
  • 线程池是一种多线程的处理模式,在处理的过程中将任务添加到队列中,在创建线程后启动这些任务。 我们知道,生产一个线程的开销比一个进程要小的多,线程的销毁也比进程方便。但是我们在调用多线程的时候,不同线程...
  • 前言 在很公司小组都需要使用会议室进行讨论,但是每个小组都配备一个...设计线程池有几个关键的问题:第一,线程中应该创建几个工作线程;第二,是否应该等待线程执行结束… 第一个线程 #include <thread> #
  • 由浅入深的介绍 linux windows下多线程程序设计,线程池模型设计,针对多线程编程,详细地介绍 Windows 和 Linux操作系统层面上提供的各种多线程接口,理解并熟悉它们的使用操作系统层面上关于多线程多线程协作的...
  • 1.多线程加锁 #include <mutex> mutex mut; mut.lock (); mut.unlock (); 以下代码执行结果为200000000,证明没有发生val被一个线程获取处理还没写回的时候,被另一个线程读走。 如果不加锁,val的结果会...
  • c++11线程池实现

    万次阅读 多人点赞 2015-07-22 00:01:39
    然而 c++ 对于多线程的支持还是比较低级,稍微高级一点的用法都需要自己去实现,譬如线程池、信号量等。线程池(thread pool)这个东西,在面试上多次被问到,一般的回答都是:“管理一个任务队列,一个线程队列,然后...
  • 学习多线程之前,我们先要了解几个关于多线程有关的概念。 进程:进程指正在运行的程序。确切的来说,当一个程序进入内存运行,即变成一个进程,进程是处于运行过程中的程序,并且具有一定独立功能。 线程:...
  • C++线程线程池

    万次阅读 2019-11-30 11:01:44
    C++多线程技术与线程池 0: 获取线程ID #include <unistd.h> #include <sys/syscall.h> #include <sys/types.h> std::uint32_t lwp_id() { #if defined(APPLE) || defined(__APPLE__) || defined...
  • C++多线程编程(教程+Demo)

    热门讨论 2013-12-10 16:55:05
    Win32 SDK函数支持进行多线程的程序设计,并提供了操作系统原理...Visual C++ 6.0中,使用MFC类库也实现了多线程的程序设计,使得多线程编程更加方便。 该教程提供了由浅入深的讲解及Demo,对初学多线程编程很有帮助。
  • 传 统多线程方案中我们采用的服务器模型则是一旦接受到请求之后,即创建一个新的线程,由该线程执行任务。任务执行完毕后,线程退出,这就是是“即时创建,即 时销毁”的策略。尽管与创建进程相比,创建线程的时间...
  • C++ 线程池

    万次阅读 2019-05-18 19:53:33
    2019-05-17 20:16:52 原文链接 C++线程池 ...使用线程,无限制循环等待队列,进行计算和操作。帮助快速降低和减少性能损耗。 线程池的组成 线程池管理器:初始化和创建线程,启动和停止线程,调...
  • C++线程池/线程工具

    2018-04-08 01:12:50
    2:可以使用类成员函数/全局函数单独的为线程池添加一个任务,可以带个参数。 3:线程池线程数量可手动扩展,稍作修改可以修改为自动扩充。 是否下载可前往...
  • 有点瑕疵,在绑定函数时不应新建线程,还在想优化方法 #include<iostream> #include<thread> #include<vector> #include<mutex> #include<stdlib.h> #include<windows.h> std::...
  • 一个易于使用的C ++ 11线程池。 使用ThreadPool类对自由函数进行排队,并使用std :: for_each()和std :: transform()的并行版本。 可配置为仅用于标头或与库一起使用。 有许多用法示例。
  • C++多线程SOCKET收发

    2014-07-08 16:47:25
    C++多线程SOCKET收发纯手工打造,网上的例子和解释都不行~既可以学习多线程操作,又可以实现SOCLET编程
  • 详解C++17线程池的实现

    千次阅读 多人点赞 2019-03-30 02:43:42
    在其他的OOP语言(例如Java、C#等)中有线程池来避免过多过频创建线程,提升了不少的性能,在网络库中也有用到线程池的情况,因此了解一个线程池的实现可以更加熟悉多线程的使用。本文将用C++17实现一个简单的线程池,...
  • c++11中线程池和定时器的使用

    千次阅读 2019-02-17 15:07:27
    本篇文章主要介绍线程池和定时器的混合使用,实现多线程的时间调度 线程池:一种线程的使用模式,线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行...
  • C++高性能线程池代码

    热门讨论 2011-02-22 13:58:35
    高性能C++线程池,提升代码执行性能,用于服务器编程
  • 多线程并行运算

    2018-05-16 15:55:53
    能够利用多线程实现并行运算,并获得良好得效果,能够很好得利用多线程实现查找运算

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 30,448
精华内容 12,179
关键字:

c++多线程线程池

c++ 订阅