从百行线程池了解线程池相关技术


概述

在实际编程实践的过程中,一个应用通常由多个线程组成,这些线程通常可以分成两类

  1. 常驻线程:这种线程的生命周期通常和应用的生命周期大致相同,在应用运行的期间,这些线程就一直存在。通常这些线程负责的是应用整个生命周期内,需要不断运行的任务。比如,用于负责整个应用的日志记录的线程、负责通讯的线程、负责UI交互的线程
  2. 非常驻线程:用于执行异步操作任务,该任务的执行周期较短,并且不是周期性的任务,没有必要建立一个常驻线程。比如,进行计算量比较大的任务,比如矩阵分块运算的时候,可以将一个任务拆分给多个线程进行计算,然后在汇总结果从而提高计算速度。此时这些拆分出来的任务,可以为其创建新的线程来进行计算,当计算完成返回计算结果之后,线程销毁

2种通过创建一个线程,对异步任务进行计算然后在计算完成返回结果之后再销毁线程。这种设计在硬件资源比较充足或者这种”临时性”的异步需求不是特别频繁的时候,是无需特别在意的,但是如果需要频繁地请求创建任务和销毁任务,那么由此带来的额外开销可能会比执行任务本身的开销更大(比如简单地计算任务),那么这种情况下使用线程池可以缓解这个问题。

一些关键点

实现一个线程池通常需要以下几个关键点:

  1. 线程池中的池,即预先创建的若干线程的存储
  2. 线程池中线程的状态管理,即线程未执行异步任务时需要阻塞,执行异步任务的时候需要唤醒。以及识别当前线程中线程处于何种状态(空闲/忙碌)
  3. 通用任务的封装,线程池通常不是为执行单一某种异步任务而设计的,是为了多种异步任务而设计的,此时需要将任务进行封装,从而使得这些任务可以以一种统一的方式传递给线程池,供线程池进行执行

7.5k star的轻量级线程池,算上头文件代码行数也就100行。

代码实现

数据成员

class ThreadPool {
public:
    ThreadPool(size_t);
    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args) 
        -> std::future<typename std::result_of<F(Args...)>::type>;
    ~ThreadPool();
private:
    // need to keep track of threads so we can join them
    std::vector< std::thread > workers;
    // the task queue
    std::queue< std::function<void()> > tasks;

    // synchronization
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop;
};

数据成员其实不算多,仅仅5个

  • workers:用来存储预先创建(pre-allocate)的线程对象的,是用来保存线程对象的容器
  • tasks:任务队列(queue),用来存储接受到异步任务的。当有任务需要该线程池来执行时,将任务放入该队列中。线程池中有空闲的线程时,从中唤醒一个空闲的线程,然后从该队列中取出一个任务,将该任务交给该线程来执行
  • queue_mutex:互斥锁,用来保证任务队列tasks以及控制线程池停止的stop标志位的多线程并发安全的
  • condition:条件变量,用来实现线程池内的线程的休眠(阻塞)和运行(唤醒)的
  • stop:用来结束线程池的运行,用来控制线程池停止,线程池内的线程的退出和销毁

构造

讨论完线程池的内部数据成员,我们看一下该线程池对象ThreadPool的构造函数的代码实现,代码如下:

// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
    :   stop(false)
{
    for(size_t i = 0;i<threads;++i)
        workers.emplace_back(
            [this]
            {
                for(;;)
                {
                    std::function<void()> task;

                    {
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        this->condition.wait(lock,
                            [this]{ return this->stop || !this->tasks.empty(); });
                        if(this->stop && this->tasks.empty())
                            return;
                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }

                    task();
                }
            }
        );
}

在构造函数处通过for循环创建完指定数量线程,同时在线程内部启动一个工作函数,保证线程安全的同时用来不断消费任务队列里的任务。同时调用线程池类的condition条件变量的wait函数,等待条件变量的满足。如果线程池内存在线程由于任务队列为空时,进入阻塞等待状态时,此时如果有新的任务添加到任务队列,在接受任务的函数内会调用条件变量的通知函数,此时线程池内会有一个线程在此行函数,判定被通知为条件变量满足,退出等待条件变量的阻塞状态,唤醒开始执行该任务。

异步任务接受函数

大部分线程池的核心部分,如何添加异步任务,首先我们看一下函数签名

// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) 
    -> std::future<typename std::result_of<F(Args...)>::type>
{
    using return_type = typename std::result_of<F(Args...)>::type;

    auto task = std::make_shared< std::packaged_task<return_type()> >(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );

    std::future<return_type> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(queue_mutex);

        // don't allow enqueueing after stopping the pool
        if(stop)
            throw std::runtime_error("enqueue on stopped ThreadPool");

        tasks.emplace([task](){ (*task)(); });
    }
    condition.notify_one();
    return res;
}
  1. 函数返回值:std::future<typename std::result_of<F(Args...)>::type>,返回一个future类型的对象,该对象可以用于异步任务创建方来获取异步调用结果。
  2. 函数形参:f用于接受携带异步任务的可调用对象,args用来携带执行异步调用需要的函数调用的参数信息。

更推荐用 std::invoke_result_t<F, Args...>>去取代std::result_of<F(Args...)>::type

重点在task是怎么被包装的,通过构建一个shared_ptr指针,该指针指向一个packaged_task对象,同时参数通过std::bind(),对可调用对象f进行柯里化,将可调用对象和其执行时需要的函数参数,封装成一个新的可调用对象,这个新的可调用对象进行函数调用的时候,无需进行函数传参。当然使用lambda表达式同样也可以进行绑定(asio中可以减少bind的使用)。

在这之后通过 task->get_future()获取异步调用的结果future对象并将这个future返回,同时不要忘记将task入队列,通过lambda包装后统一为void()函数类型。入队列后通知线程去取用,这里需要注意最好要等锁结束后再去通知,不然可能有极小概率出现通知结束后取不到锁的问题。

上面代码行中涉及到了我们前面讨论到的任务的统一化存储,我们集中梳理讨论一下。先说一下为什么要做任务的统一化存储?通常线程池是用来接受不同的异步操作任务的,每一个异步操作(异步调用)任务,通常是有一个可调用对象来存储的,这些可调用对象的进行函数调用的时候需要的参数和返回值的类型,是有差异的。如果直接对原始的异步操作任务进行存储,由于C++是强类型语言,是不能将有差异的异步操作类型保存到同一个容器中的。所以需要对异步操作任务进行封装,将其封装成统一的任务进行存储。

首先,通过std::bind对可调用对象进行柯里化,将带参数的可调用对象封装成无参数的可调用对象,从而磨平可调用对象在参数上的差异。这里std::bind通过接受初始可调用对象以及参数,然后根据两者构建出一个新的可调用对象,并将这个新的可调用对象返回完成柯里化操作。最后,通过lambdapackaged_task封装成一个无返回值,无参数的lambda对象,此时该lambda对象将最后一个差异,即返回值的差异,磨平从而可以达到统一存储到一个容器(任务队列)的目的。

为什么这里不将package_task对象直接存储到任务队列中?

这事因为packaged_task对象的类型是一个类模版std::packaged_task<return_type>,其模版实例化之后的类型信息中包含了原始可调用对象的返回值的类型信息。当可调用对象的返回值类型不同的时候,对应的packaged_task的类型信息是不相同的,无法存储到同一个容器中

析构

上面已经将线程池运行的时候的主要代码都讨论完毕,下面只剩下一个析构函数的代码实现。

// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        stop = true;
    }
    condition.notify_all();
    for(std::thread &worker: workers)
        worker.join();
}

思考一个问题

当调用条件变量的notify_one函数的时候,该操作仅仅会唤醒在该条件变量上处于等待阻塞的其他线程。当其他线程未处于等待阻塞状态的时候,此时此次通知不会传递到该线程。

比如有一个主线程和两个工作线程,当主线程调用条件变量的notify_one函数的时候,另外两个线程都处于工作状态,即没有阻塞在condition.wait函数上的时候,此时此次对于条件变量的这次notify_once函数的调用将会是无效的,后续这两个线程工作完成之后,重新等待该条件变量满足的时候,这两个线程都会阻塞在wait函数位置。

如果一瞬间异步调用的任务来的比较密集,造成异步调用队列的任务堆积,即所有的线程都处于工作状态,此时调用ThreadPoolenqueue成员函数将任务入任务队列的时候,此时当所有工作线程处理完当前的异步任务之后是否会处理这个新入队的异步任务?

 [this]
            {
                for(;;)
                {
                    std::function<void()> task;

                    {
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        this->condition.wait(lock,
                            [this]{ return this->stop || !this->tasks.empty(); });
                        if(this->stop && this->tasks.empty())
                            return;
                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }

                    task();
                }
            }

问题的关键之处,就在于std::condition_variable::wait这个函数的实现,我们看一下CPP Reference关于这个函数的相关描述

可以看见与这种写法是等价的,注意,也就是其会先判断传入的stop_waiting谓词判别式,当该判别式返回true的时候,意味着不需要在此条件变量上进行等待。此时不会进入while循环,不进行条件变量的wait操作。stop_waiting条件判别式在ThreadPool传入的是,如果当前线程池处于停止状态或者当前异步调用任务队列不为空(还有待执行的异步任务)的时候,该判别式返回true

因此当正在工作的工作线程结束当前工作的时候,重新进入下一次for循环的时候,如果此时异步调用队列不为空的时候,不会在条件变量上调用wait函数,此时会继续从异步任务队列中获取任务并对获取到的任务进行处理。

应用示例

 ThreadPool pool(4);
    std::vector< std::future<int> > results;

    for(int i = 0; i < 8; ++i) {
        results.emplace_back(
            pool.enqueue([i] {
                std::cout << "hello " << i << std::endl;
                std::this_thread::sleep_for(std::chrono::seconds(1));
                std::cout << "world " << i << std::endl;
                return i*i;
            })
        );
    }

    for(auto && result: results)
        std::cout << result.get() << ' ';
    std::cout << std::endl;

文章作者: JoyTsing
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 JoyTsing !
评论
  目录