跳转至

线程池设计

  • 线程池:线程池具有预先分配一些线程,自动从任务队列获取任务执行的功能。
  • 优点:减少线程创建销毁的时间,提高效应速度
  • 缺点:内存占用增加
  • 使用场景:线程创建频繁的,成本比较大的

1 线程池架构

2 线程池类设计

  1. 线程函数
  2. 函数需要一直执行,通常while循环实现
  3. 能够响应任务的执行信号和线程的停止信号,通常通过条件变量阻塞等待执行信号到来
  4. 任务队列
  5. 任务队列里的函数原型通常是void()型,即无返回值,无参数;
  6. 任务队列函数实现能容纳任务类型的函数,通常利用lambda函数将外面函数作为参数导入函数内部使用

3 线程池管理类设计

  1. 控制线程池的扩缩容,销毁功能

4 源码设计

  • 线程池代码
#include<functional>
#include<type_traits>
#include<memory>
#include<mutex>
#include<future>
#include<atomic>
#include<condition_variable>
#include<thread>
#include<queue>
#include<vector>
class ThreadPool
{
private:
    using task = std::function<void()>;
private:
    int m_tNum;//线程池大小
    int m_tunusedNum;//可用线程数
    std::queue<task> m_tasks;//任务队列
    std::vector<std::thread> m_threads;//线程容器
    std::mutex m_m;
    std::condition_variable m_cv;
    std::atomic<bool> m_stop{ false };
public:
    ThreadPool(const unsigned int& sz);
    ~ThreadPool();
    template<typename F, typename ...Args>
    auto addTask(F&& f, Args&& ...args)->std::future<typename std::result_of_t<F(Args...)>>;
private:
    void createThreadPool(const unsigned int& sz);
};
ThreadPool::ThreadPool(const unsigned int& sz) :m_tNum(sz)
{
    //创建线程
    createThreadPool(sz);
}
ThreadPool::~ThreadPool()
{
    m_stop.store(true);
    m_cv.notify_all();
    for (auto& t : m_threads)
    {
        if (t.joinable())
            t.join();
    }

}
void ThreadPool::createThreadPool(const unsigned int& sz)
{
    for (size_t i = 0; i < sz; ++i)
    {
        m_threads.emplace_back([this]() {
            while (true)
            {
                task t;
                {
                    std::unique_lock<std::mutex> lg(m_m);
                    //激活条件:1.线程池不再使用。2.任务队列不为空。
                    m_cv.wait(lg, [this]()->bool {
                        return m_stop.load() || !m_tasks.empty();
                        });
                    //线程池不再使用,终结此线程
                    if (m_stop.load())
                        return;
                    //从任务队列取出任务,并执行
                    t = m_tasks.front();
                    m_tasks.pop();
                }
                //执行任务
                t();
            }
            });
    }
}

template<typename F, typename ...Args>
auto ThreadPool::addTask(F&& f, Args&& ...args)->std::future<typename std::result_of_t<F(Args...)>>
{
    using RET_TYPE = typename std::result_of_t<F(Args...)>;
    //创建智能指针保存 packaged_task类型对象,packaged_task持有ET_TYPE()函数
    auto sh_task = std::make_shared<std::packaged_task<RET_TYPE()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
    //将新任务放入任务队列中
    {
        std::unique_lock<std::mutex> lg(m_m);
        //将任务放入void()函数中,再放入任务队列中
        m_tasks.emplace([sh_task] {
            (*sh_task)();
            });
    }
    //唤醒一个线程
    m_cv.notify_one();
    return sh_task->get_future();
}
  • 测试代码
#include<iostream>
using namespace std;
int main() {
    ThreadPool pool(5);
    vector<future<int>> res;
    for (size_t i = 0; i < 5; i++)
    {
        res.emplace_back(pool.addTask([](int j) {
            printf("函数%d,线程ID %d\n",j,this_thread::get_id());
            return j*j;
            },i));
    }
    for (auto & elm:res)
    {
        cout << elm.get() << endl;
    }   
}