跳转至

c++线程

线程可以总结成2部分,一个是如何启动线程,另一个是如何保证线程安全

1 启动线程

1.1 thread

  • 头文件:#include<thread>

thread是一个类,构造thread对象时传入函数以及参数,thread会启动一个线程

#include<thread>
#include<iostream>
using namespace std;
int add(int x, int y)
{
    cout << "x + y = " << x + y << endl;
    return x + y;
}

int main()
{
    thread t(add, 2, 3);
    t.join();
}

thread类有几个重要的成员函数 - thread::join():阻塞等待线程执行完成 - thread::detach:卸离线程到后台执行(主线程无法控制线程了) - thread::get_id:获取线程id - thread::joinable():判断thread是否有一个关联线程(d当thread对象由默认构造创建时就是一个没有关联线程的thread,这也是没意义的)

1.2 future

  • 头文件:#include<future>
  • 来源:
  • async 1.2. - packaged_task

c++11不仅增加了thread还增加了future,这使得可以获得线程执行结果(即线程函数返回值),而这在thread中是无法直接获取返回值的(在以前的方案中往往把需要的结果以参数形式传给函数)。future由async或packaged_task产生。
  • future有以下几个重要函数
  • future::get():获取future保有的线程函数返回值(阻塞直到线程执行完成)
  • future::wait():阻塞直到线程完成
  • future::wait_for():阻塞一段时间,但并不保证线程能够执行完
  • future::wait_until():阻塞直到某一时间,但并不保证线程能够执行完
  • future::valid()判断future是否持有线程函数状态
  • future::share():产生一个shared_future,并使当前future失效

shared_futurefuture不同在于,future只能使用一次get函数就会使future持有的状态失效(即再次调用get将会失败),调用valid会返回false;而shared_future能够多次使用get来获取线程函数执行完返回值。

#include<future>
#include<iostream>
using namespace std;
int add(int x, int y)
{
    cout << "x + y = " << x + y << endl;
    return x + y;
}

int main()
{
    future<int> fi(async(add, 2, 3));
    int iv=fi.get();
}

1.2.1 async

async是一个函数,用于启动一个线程并返回一个future,相当于把线程的执行权交给了future

1.2.2 packaged_task

packaged_task是一个类,一般用于线程池中,它并不启动一个线程,但他会返回一个future,packaged_task内部实现了()运算符,一般在另一个线程中去执行,在另一线程调用packaged_task::get_future()返回的future对象去获取线程packaged_task()(函数对象)执行结果

1.3 this_thread

命名空间this_thread提供了一些便于操作线程的函数 - this_thread::get_id() - this_thread::sleep_for(dur) - this_thread::sleep_until(tp) - this_thread::yield():放弃当前线程的时间片(相当于告诉cpu去执行下一线程,我这不执行),用于轮询等待某一任务完成时能发挥重要作用

while(!readyFlag)
{
    this_thread::yield();
}

1.4 总结

  • 真正可以启动一个线程的是async(函数)和thread(类)
  • future使获取线程函数执行结果更方便
  • packaged_task一般用于线程池实现

2 线程锁

2.1 mutex和lock

  • mutex:互斥量可以直接锁住,但通常会通过lock_guard来锁
  • recursive_mutex:同mutex,一般使用循环中,如一个函数调用另一个函数,这2个函数都持有相同的metux,这是recursive_mutex就能避免死循环问题
  • lock_guard:持有mutex,锁住并自动释放锁
  • unique_lock:同lock_guard,但多了些其它控制函数,比如可以主动调用unique_lock::unlock()解锁
#include<mutex>
#include<iostream>
using namespace std;
int add(int x, int y)
{
    cout << "x + y = " << x + y << endl;
    return x + y;
}

int main()
{
    //metux
    {
        mutex mt;
        {
            mt.lock();
            add(2, 3);
            mt.unlock();
        }
        {
            lock_guard<mutex> lg(mt);
            add(2, 3);
        }
        {
            unique_lock<mutex> ulg(mt);
            add(2, 3);
            ulg.unlock();
        }
    }

}

2.2 condition_variable

condition_variable条件变量,顾名思义就是用来当达到某一条件时才去执行某一操作。 - condition_variableunique_lock一同使用 - condition_variable有几个重要函数 - condition_variable::notify_one():唤醒一个等待线程 - condition_variable::notify_notify_all:唤醒所有等待线程 - condition_variable::wait(unique_lock,bool);:等待条件满足 - 示例如下

#include<future>
#include<thread>
#include<condition_variable>
#include<mutex>
#include<iostream>
using namespace std;

int main()
{
    //condition_variable
    {
        mutex mt;
        bool b = false;
        condition_variable cv;
        future<void> fv = async([&cv,&mt,&b] {
            unique_lock<mutex> ug(mt);
            cv.wait(ug, [&b] {              
                return b;
            });
            cout << "hello world!" << endl; 
        });
        thread t([&cv, &mt, &b] {
            unique_lock<mutex> ug(mt);
            cv.notify_one();
            b = true;
            cout << "notify" << endl;
        });     
        fv.get();
        t.join();
    }
}

输出结果

notify
hello world!

2.3 atomic

atomic用来处理原子操作,比如对一个数据读写,就可以使用,或者作为一个条件在某些场合可以取代condition_variable

#include<atomic>
#include<iostream>
using namespace std;
int main()
{
    //atomic
    {
        atomic<bool> b(false);
        b.store(true);
        while (b.load())
        {
            cout << "hello world" << endl;
        }
    }
}

3 线程池

  • 实现代码
#include<functional>
#include<type_traits>
#include<memory>
#include<mutex>
#include<future>
#include<atomic>
#include<condition_variable>
#include<thread>
#include<queue>
#include<vector>
class ThreadPool
{
private:
    using task = std::function<void()>;
private:
    int m_tNum;//线程池大小
    int m_tunusedNum;//可用线程数
    std::queue<task> m_tasks;//任务队列
    std::vector<std::thread> m_threads;//线程容器
    std::mutex m_m;
    std::condition_variable m_cv;
    std::atomic<bool> m_stop{ false };
public:
    ThreadPool(const unsigned int& sz);
    ~ThreadPool();
    template<typename F, typename ...Args>
    auto addTask(F&& f, Args&& ...args)->std::future<typename std::result_of_t<F(Args...)>>;
private:
    void createThreadPool(const unsigned int& sz);
};
ThreadPool::ThreadPool(const unsigned int& sz) :m_tNum(sz)
{
    //创建线程
    createThreadPool(sz);
}
ThreadPool::~ThreadPool()
{
    m_stop.store(true);
    m_cv.notify_all();
    for (auto& t : m_threads)
    {
        if (t.joinable())
            t.join();
    }

}
void ThreadPool::createThreadPool(const unsigned int& sz)
{
    for (size_t i = 0; i < sz; ++i)
    {
        m_threads.emplace_back([this]() {
            while (true)
            {
                task t;
                {
                    std::unique_lock<std::mutex> lg(m_m);
                    //激活条件:1.线程池不再使用。2.任务队列不为空。
                    m_cv.wait(lg, [this]()->bool {
                        return m_stop.load() || !m_tasks.empty();
                        });
                    //线程池不再使用,终结此线程
                    if (m_stop.load())
                        return;
                    //从任务队列取出任务,并执行
                    t = m_tasks.front();
                    m_tasks.pop();
                }
                //执行任务
                t();
            }
            });
    }
}

template<typename F, typename ...Args>
auto ThreadPool::addTask(F&& f, Args&& ...args)->std::future<typename std::result_of_t<F(Args...)>>
{
    using RET_TYPE = typename std::result_of_t<F(Args...)>;
    //创建智能指针保存 packaged_task类型对象,packaged_task持有ET_TYPE()函数
    auto sh_task = std::make_shared<std::packaged_task<RET_TYPE()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
    //将新任务放入任务队列中
    {
        std::unique_lock<std::mutex> lg(m_m);
        //将任务放入void()函数中,再放入任务队列中
        m_tasks.emplace([sh_task] {
            (*sh_task)();
            });
    }
    //唤醒一个线程
    m_cv.notify_one();
    return sh_task->get_future();
}
  • 测试代码
#include<iostream>
using namespace std;
int main() {
    ThreadPool pool(5);
    vector<future<int>> res;
    for (size_t i = 0; i < 5; i++)
    {
        res.emplace_back(pool.addTask([](int j) {
            printf("函数%d,线程ID %d\n",j,this_thread::get_id());
            return j*j;
            },i));
    }
    for (auto & elm:res)
    {
        cout << elm.get() << endl;
    }   
}

2 Linux pthread

  • 注意事项
  • 在编译代码时需要加上-pthread选项

1 线程

1.1 线程创建

线程函数一定要声明称`void* fun(void*)`类型
  • 线程创建使用函数
  • pthread_self():获取当前线程id
  • int pthread_create(pthread_t *thread, const pthread_attr_t *attr,void *(*start_routine) (void *), void *arg):创建一个线程,线程函数可以是静态类成员函数,可以是普通函数,但不能是类非静态成员函数
  • int pthread_join(pthread_t thread, void **retval):阻塞等待线程结束,retval可以接收线程退出时返回的数据
    • retval:非NULL则含有由pthread_exit或函数取消携带的返回值
  • int pthread_detach(pthread_t thread):分离线程,让其后台运行
  • void pthread_exit(void *retval):终止当前线程
    • retval:返回的值,可以在pthread_join第2个参数获取
  • 设置线程创建属性
  • int pthread_attr_init(pthread_attr_t *attr);:初始化线程创建属性(attr可有pthread_attr_开头函数设置)
  • int pthread_attr_destroy(pthread_attr_t *attr);:释放线程创建属性
  • pthread_exit:线程退出,可携带退出数据,pthread_join中低个参数获取
  • 线程取消函数
  • int pthread_cancel(pthread_t thread):取消线程
    • thread:待取消的线程id
  • int pthread_setcancelstate(int state, int *oldstate):设置线程是否可以被取消的状态,决定pthread_cancel发送的取消线程是否有效
    • state:新状态,可选值有:PTHREAD_CANCEL_ENABLE(可以被取消,)、PTHREAD_CANCEL_DISABLE(不可以被取消)
    • oldstate:原状态
  • int pthread_setcanceltype(int type, int *oldtype):设置线程的取消模式
    • type:新类型,可选择有:PTHREAD_CANCEL_DEFERRED(延迟取消,在下一个取消点处)、PTHREAD_CANCEL_ASYNCHRONOUS(立即取消)
    • oldtype:原类型
  • void pthread_testcancel(void):创建线程取消点,在pthread_setcanceltype设置的PTHREAD_CANCEL_ASYNCHRONOUS类型以及有线程取消事件时退出线程
  • 示例
#include <pthread.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <ctype.h>

#define handle_error_en(en, msg) \
    do                           \
    {                            \
        errno = en;              \
        perror(msg);             \
        exit(EXIT_FAILURE);      \
    } while (0)

#define handle_error(msg)   \
    do                      \
    {                       \
        perror(msg);        \
        exit(EXIT_FAILURE); \
    } while (0)

struct thread_info
{                        /* Used as argument to thread_start() */
    pthread_t thread_id; /* ID returned by pthread_create() */
    int thread_num;      /* Application-defined thread # */
    char *argv_string;   /* From command-line argument */
};

/* Thread start function: display address near top of our stack,
   and return upper-cased copy of argv_string */

static void *
thread_start(void *arg)
{
    struct thread_info *tinfo = arg;
    char *uargv, *p;

    printf("Thread %d: top of stack near %p; argv_string=%s\n",
           tinfo->thread_num, &p, tinfo->argv_string);

    uargv = strdup(tinfo->argv_string);
    if (uargv == NULL)
        handle_error("strdup");

    for (p = uargv; *p != '\0'; p++)
        *p = toupper(*p);

    return uargv;
}

int main(int argc, char *argv[])
{
    int s, tnum, opt, num_threads;
    struct thread_info *tinfo;
    pthread_attr_t attr;
    int stack_size;
    void *res;

    /* The "-s" option specifies a stack size for our threads */

    stack_size = -1;
    while ((opt = getopt(argc, argv, "s:")) != -1)
    {
        switch (opt)
        {
        case 's':
            stack_size = strtoul(optarg, NULL, 0);
            break;

        default:
            fprintf(stderr, "Usage: %s [-s stack-size] arg...\n",
                    argv[0]);
            exit(EXIT_FAILURE);
        }
    }

    num_threads = argc - optind;

    /* Initialize thread creation attributes */

    s = pthread_attr_init(&attr);
    if (s != 0)
        handle_error_en(s, "pthread_attr_init");

    if (stack_size > 0)
    {
        s = pthread_attr_setstacksize(&attr, stack_size);
        if (s != 0)
            handle_error_en(s, "pthread_attr_setstacksize");
    }

    /* Allocate memory for pthread_create() arguments */

    tinfo = calloc(num_threads, sizeof(struct thread_info));
    if (tinfo == NULL)
        handle_error("calloc");

    /* Create one thread for each command-line argument */

    for (tnum = 0; tnum < num_threads; tnum++)
    {
        tinfo[tnum].thread_num = tnum + 1;
        tinfo[tnum].argv_string = argv[optind + tnum];

        /* The pthread_create() call stores the thread ID into
           corresponding element of tinfo[] */

        s = pthread_create(&tinfo[tnum].thread_id, &attr,
                           &thread_start, &tinfo[tnum]);
        if (s != 0)
            handle_error_en(s, "pthread_create");
    }

    /* Destroy the thread attributes object, since it is no
       longer needed */

    s = pthread_attr_destroy(&attr);
    if (s != 0)
        handle_error_en(s, "pthread_attr_destroy");

    /* Now join with each thread, and display its returned value */

    for (tnum = 0; tnum < num_threads; tnum++)
    {
        s = pthread_join(tinfo[tnum].thread_id, &res);
        if (s != 0)
            handle_error_en(s, "pthread_join");

        printf("Joined with thread %d; returned value was %s\n",
               tinfo[tnum].thread_num, (char *)res);
        free(res); /* Free memory allocated by thread */
    }

    free(tinfo);
    exit(EXIT_SUCCESS);
}

1.2 线程属性

2 锁

2.1 互斥量

  • 互斥量:pthread_mutex_t
  • 相关函数
  • int pthread_mutex_init(pthread_mutex_t *restrict mutex,const pthread_mutexattr_t *restrict attr);:初始化互斥量,attr可以为NULL
  • int pthread_mutex_lock(pthread_mutex_t *mutex);:加锁
  • int pthread_mutex_trylock(pthread_mutex_t *mutex);:尝试加锁
  • int thread_mutex_unlock(pthread_mutex_t *mutex);:解锁
  • int pthread_mutex_destroy(pthread_mutex_t *mutex);:销毁互斥量

2.2 自旋锁

  • 特点:等待锁时,处于忙等待状态,耗费cpu,适合在锁占用时间特别短的场景下使用
  • 自旋锁:pthread_spinlock_t
  • 相关函数
  • int pthread_spin_init(pthread_spinlock_t *lock, int pshared);:初始化自旋锁
  • int pthread_spin_lock(pthread_spinlock_t *lock);:加自旋锁
  • int pthread_spin_trylock(pthread_spinlock_t *lock);:尝试加自旋锁
  • int pthread_spin_unlock(pthread_spinlock_t *lock);:解自旋锁
  • int pthread_spin_destroy(pthread_spinlock_t *lock);:销毁自旋锁

2.3 条件变量

pthread_cond_wait 需要传入一个已经加锁的mutex,当阻塞时,mutex又被解锁。如果接收到信号,mutex又被加锁,所以pthread_cond_wait之后还需要解锁。
pthread_cond_signal 执行前也需要先先加锁,执行完再解锁
  • 条件变量:pthread_cond_t
  • 相关函数
  • int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);:初始化条件变量,sttr可为NULL
  • int pthread_cond_timedwait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex,const struct timespec *restrict abstime);:等待指定的时间,无论是否被唤醒都将返回
  • int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);:阻塞等待被唤醒
  • int pthread_cond_broadcast(pthread_cond_t *cond);:唤醒所有阻塞在此cond上的线程
  • int pthread_cond_signal(pthread_cond_t *cond);:唤醒一个阻塞在此cond上的线程
  • int pthread_cond_destroy(pthread_cond_t *cond);:销毁条件变量
  • pthread_cond_wait 示例
pthread_mutex_lock(&mut);

while (!buffer_is_set())
    pthread_cond_wait(&cond, &mut);

consume_buffer();

pthread_mutex_unlock(&mut);
  • pthread_cond_signal 伪代码
 pthread_lock(&mut);

 setting_buffer(); /* Now buffer_is_set() will return true */

 pthread_cond_signal(&cond);

 pthread_unlock(&mut);

2.4 读写锁

https://www.cnblogs.com/dins/p/pthread-rwlock-t.html - 读写锁:pthread_rwlock_t - 相关函数 - int pthread_rwlock_init(pthread_rwlock_t * rwlock, const pthread_rwlockattr_t * attr);:初始化读写锁 - int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);:加读锁 - int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);:加写锁 - int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);:解读写锁 - int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);:销毁读写锁