c++线程¶
线程可以总结成2部分,一个是如何启动线程,另一个是如何保证线程安全
1 启动线程¶
1.1 thread¶
- 头文件:
#include<thread>
thread是一个类,构造thread对象时传入函数以及参数,thread会启动一个线程
#include<thread>
#include<iostream>
using namespace std;
int add(int x, int y)
{
cout << "x + y = " << x + y << endl;
return x + y;
}
int main()
{
thread t(add, 2, 3);
t.join();
}
thread类有几个重要的成员函数
- thread::join()
:阻塞等待线程执行完成
- thread::detach
:卸离线程到后台执行(主线程无法控制线程了)
- thread::get_id
:获取线程id
- thread::joinable()
:判断thread是否有一个关联线程(d当thread对象由默认构造创建时就是一个没有关联线程的thread,这也是没意义的)
1.2 future¶
- 头文件:
#include<future>
- 来源:
async
1.2. -packaged_task
c++11不仅增加了thread还增加了future,这使得可以获得线程执行结果(即线程函数返回值),而这在thread中是无法直接获取返回值的(在以前的方案中往往把需要的结果以参数形式传给函数)。future由async或packaged_task产生。
- future有以下几个重要函数
future::get()
:获取future保有的线程函数返回值(阻塞直到线程执行完成)future::wait()
:阻塞直到线程完成future::wait_for()
:阻塞一段时间,但并不保证线程能够执行完future::wait_until()
:阻塞直到某一时间,但并不保证线程能够执行完future::valid()
判断future是否持有线程函数状态future::share()
:产生一个shared_future
,并使当前future
失效
shared_future
与future
不同在于,future
只能使用一次get
函数就会使future
持有的状态失效(即再次调用get将会失败),调用valid
会返回false
;而shared_future
能够多次使用get
来获取线程函数执行完返回值。
#include<future>
#include<iostream>
using namespace std;
int add(int x, int y)
{
cout << "x + y = " << x + y << endl;
return x + y;
}
int main()
{
future<int> fi(async(add, 2, 3));
int iv=fi.get();
}
1.2.1 async¶
async
是一个函数,用于启动一个线程并返回一个future
,相当于把线程的执行权交给了future
1.2.2 packaged_task¶
packaged_task
是一个类,一般用于线程池中,它并不启动一个线程,但他会返回一个future
,packaged_task
内部实现了()
运算符,一般在另一个线程中去执行,在另一线程调用packaged_task::get_future()
返回的future
对象去获取线程packaged_task()
(函数对象)执行结果
1.3 this_thread¶
命名空间this_thread
提供了一些便于操作线程的函数
- this_thread::get_id()
- this_thread::sleep_for(dur)
- this_thread::sleep_until(tp)
- this_thread::yield()
:放弃当前线程的时间片(相当于告诉cpu去执行下一线程,我这不执行),用于轮询等待某一任务完成时能发挥重要作用
while(!readyFlag)
{
this_thread::yield();
}
1.4 总结¶
- 真正可以启动一个线程的是
async
(函数)和thread
(类) future
使获取线程函数执行结果更方便packaged_task
一般用于线程池实现
2 线程锁¶
2.1 mutex和lock¶
mutex
:互斥量可以直接锁住,但通常会通过lock_guard来锁recursive_mutex
:同mutex
,一般使用循环中,如一个函数调用另一个函数,这2个函数都持有相同的metux,这是recursive_mutex
就能避免死循环问题lock_guard
:持有mutex,锁住并自动释放锁unique_lock
:同lock_guard
,但多了些其它控制函数,比如可以主动调用unique_lock::unlock()
解锁
#include<mutex>
#include<iostream>
using namespace std;
int add(int x, int y)
{
cout << "x + y = " << x + y << endl;
return x + y;
}
int main()
{
//metux
{
mutex mt;
{
mt.lock();
add(2, 3);
mt.unlock();
}
{
lock_guard<mutex> lg(mt);
add(2, 3);
}
{
unique_lock<mutex> ulg(mt);
add(2, 3);
ulg.unlock();
}
}
}
2.2 condition_variable¶
condition_variable
条件变量,顾名思义就是用来当达到某一条件时才去执行某一操作。
- condition_variable
与unique_lock
一同使用
- condition_variable
有几个重要函数
- condition_variable::notify_one()
:唤醒一个等待线程
- condition_variable::notify_notify_all
:唤醒所有等待线程
- condition_variable::wait(unique_lock,bool);
:等待条件满足
- 示例如下
#include<future>
#include<thread>
#include<condition_variable>
#include<mutex>
#include<iostream>
using namespace std;
int main()
{
//condition_variable
{
mutex mt;
bool b = false;
condition_variable cv;
future<void> fv = async([&cv,&mt,&b] {
unique_lock<mutex> ug(mt);
cv.wait(ug, [&b] {
return b;
});
cout << "hello world!" << endl;
});
thread t([&cv, &mt, &b] {
unique_lock<mutex> ug(mt);
cv.notify_one();
b = true;
cout << "notify" << endl;
});
fv.get();
t.join();
}
}
输出结果
notify
hello world!
2.3 atomic¶
atomic
用来处理原子操作,比如对一个数据读写,就可以使用,或者作为一个条件在某些场合可以取代condition_variable
。
#include<atomic>
#include<iostream>
using namespace std;
int main()
{
//atomic
{
atomic<bool> b(false);
b.store(true);
while (b.load())
{
cout << "hello world" << endl;
}
}
}
3 线程池¶
- 实现代码
#include<functional>
#include<type_traits>
#include<memory>
#include<mutex>
#include<future>
#include<atomic>
#include<condition_variable>
#include<thread>
#include<queue>
#include<vector>
class ThreadPool
{
private:
using task = std::function<void()>;
private:
int m_tNum;//线程池大小
int m_tunusedNum;//可用线程数
std::queue<task> m_tasks;//任务队列
std::vector<std::thread> m_threads;//线程容器
std::mutex m_m;
std::condition_variable m_cv;
std::atomic<bool> m_stop{ false };
public:
ThreadPool(const unsigned int& sz);
~ThreadPool();
template<typename F, typename ...Args>
auto addTask(F&& f, Args&& ...args)->std::future<typename std::result_of_t<F(Args...)>>;
private:
void createThreadPool(const unsigned int& sz);
};
ThreadPool::ThreadPool(const unsigned int& sz) :m_tNum(sz)
{
//创建线程
createThreadPool(sz);
}
ThreadPool::~ThreadPool()
{
m_stop.store(true);
m_cv.notify_all();
for (auto& t : m_threads)
{
if (t.joinable())
t.join();
}
}
void ThreadPool::createThreadPool(const unsigned int& sz)
{
for (size_t i = 0; i < sz; ++i)
{
m_threads.emplace_back([this]() {
while (true)
{
task t;
{
std::unique_lock<std::mutex> lg(m_m);
//激活条件:1.线程池不再使用。2.任务队列不为空。
m_cv.wait(lg, [this]()->bool {
return m_stop.load() || !m_tasks.empty();
});
//线程池不再使用,终结此线程
if (m_stop.load())
return;
//从任务队列取出任务,并执行
t = m_tasks.front();
m_tasks.pop();
}
//执行任务
t();
}
});
}
}
template<typename F, typename ...Args>
auto ThreadPool::addTask(F&& f, Args&& ...args)->std::future<typename std::result_of_t<F(Args...)>>
{
using RET_TYPE = typename std::result_of_t<F(Args...)>;
//创建智能指针保存 packaged_task类型对象,packaged_task持有ET_TYPE()函数
auto sh_task = std::make_shared<std::packaged_task<RET_TYPE()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
//将新任务放入任务队列中
{
std::unique_lock<std::mutex> lg(m_m);
//将任务放入void()函数中,再放入任务队列中
m_tasks.emplace([sh_task] {
(*sh_task)();
});
}
//唤醒一个线程
m_cv.notify_one();
return sh_task->get_future();
}
- 测试代码
#include<iostream>
using namespace std;
int main() {
ThreadPool pool(5);
vector<future<int>> res;
for (size_t i = 0; i < 5; i++)
{
res.emplace_back(pool.addTask([](int j) {
printf("函数%d,线程ID %d\n",j,this_thread::get_id());
return j*j;
},i));
}
for (auto & elm:res)
{
cout << elm.get() << endl;
}
}
2 Linux pthread¶
- 注意事项
- 在编译代码时需要加上
-pthread
选项
1 线程¶
1.1 线程创建¶
线程函数一定要声明称`void* fun(void*)`类型
- 线程创建使用函数
pthread_self()
:获取当前线程idint pthread_create(pthread_t *thread, const pthread_attr_t *attr,void *(*start_routine) (void *), void *arg)
:创建一个线程,线程函数可以是静态类成员函数,可以是普通函数,但不能是类非静态成员函数int pthread_join(pthread_t thread, void **retval)
:阻塞等待线程结束,retval可以接收线程退出时返回的数据retval
:非NULL则含有由pthread_exit
或函数取消携带的返回值
int pthread_detach(pthread_t thread)
:分离线程,让其后台运行void pthread_exit(void *retval)
:终止当前线程retval
:返回的值,可以在pthread_join第2个参数获取
- 设置线程创建属性
int pthread_attr_init(pthread_attr_t *attr);
:初始化线程创建属性(attr
可有pthread_attr_
开头函数设置)int pthread_attr_destroy(pthread_attr_t *attr);
:释放线程创建属性pthread_exit
:线程退出,可携带退出数据,pthread_join中低个参数获取- 线程取消函数
int pthread_cancel(pthread_t thread)
:取消线程thread
:待取消的线程id
int pthread_setcancelstate(int state, int *oldstate)
:设置线程是否可以被取消的状态,决定pthread_cancel
发送的取消线程是否有效state
:新状态,可选值有:PTHREAD_CANCEL_ENABLE
(可以被取消,)、PTHREAD_CANCEL_DISABLE
(不可以被取消)oldstate
:原状态
int pthread_setcanceltype(int type, int *oldtype)
:设置线程的取消模式type
:新类型,可选择有:PTHREAD_CANCEL_DEFERRED
(延迟取消,在下一个取消点处)、PTHREAD_CANCEL_ASYNCHRONOUS
(立即取消)oldtype
:原类型
void pthread_testcancel(void)
:创建线程取消点,在pthread_setcanceltype
设置的PTHREAD_CANCEL_ASYNCHRONOUS
类型以及有线程取消事件时退出线程- 示例
#include <pthread.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <ctype.h>
#define handle_error_en(en, msg) \
do \
{ \
errno = en; \
perror(msg); \
exit(EXIT_FAILURE); \
} while (0)
#define handle_error(msg) \
do \
{ \
perror(msg); \
exit(EXIT_FAILURE); \
} while (0)
struct thread_info
{ /* Used as argument to thread_start() */
pthread_t thread_id; /* ID returned by pthread_create() */
int thread_num; /* Application-defined thread # */
char *argv_string; /* From command-line argument */
};
/* Thread start function: display address near top of our stack,
and return upper-cased copy of argv_string */
static void *
thread_start(void *arg)
{
struct thread_info *tinfo = arg;
char *uargv, *p;
printf("Thread %d: top of stack near %p; argv_string=%s\n",
tinfo->thread_num, &p, tinfo->argv_string);
uargv = strdup(tinfo->argv_string);
if (uargv == NULL)
handle_error("strdup");
for (p = uargv; *p != '\0'; p++)
*p = toupper(*p);
return uargv;
}
int main(int argc, char *argv[])
{
int s, tnum, opt, num_threads;
struct thread_info *tinfo;
pthread_attr_t attr;
int stack_size;
void *res;
/* The "-s" option specifies a stack size for our threads */
stack_size = -1;
while ((opt = getopt(argc, argv, "s:")) != -1)
{
switch (opt)
{
case 's':
stack_size = strtoul(optarg, NULL, 0);
break;
default:
fprintf(stderr, "Usage: %s [-s stack-size] arg...\n",
argv[0]);
exit(EXIT_FAILURE);
}
}
num_threads = argc - optind;
/* Initialize thread creation attributes */
s = pthread_attr_init(&attr);
if (s != 0)
handle_error_en(s, "pthread_attr_init");
if (stack_size > 0)
{
s = pthread_attr_setstacksize(&attr, stack_size);
if (s != 0)
handle_error_en(s, "pthread_attr_setstacksize");
}
/* Allocate memory for pthread_create() arguments */
tinfo = calloc(num_threads, sizeof(struct thread_info));
if (tinfo == NULL)
handle_error("calloc");
/* Create one thread for each command-line argument */
for (tnum = 0; tnum < num_threads; tnum++)
{
tinfo[tnum].thread_num = tnum + 1;
tinfo[tnum].argv_string = argv[optind + tnum];
/* The pthread_create() call stores the thread ID into
corresponding element of tinfo[] */
s = pthread_create(&tinfo[tnum].thread_id, &attr,
&thread_start, &tinfo[tnum]);
if (s != 0)
handle_error_en(s, "pthread_create");
}
/* Destroy the thread attributes object, since it is no
longer needed */
s = pthread_attr_destroy(&attr);
if (s != 0)
handle_error_en(s, "pthread_attr_destroy");
/* Now join with each thread, and display its returned value */
for (tnum = 0; tnum < num_threads; tnum++)
{
s = pthread_join(tinfo[tnum].thread_id, &res);
if (s != 0)
handle_error_en(s, "pthread_join");
printf("Joined with thread %d; returned value was %s\n",
tinfo[tnum].thread_num, (char *)res);
free(res); /* Free memory allocated by thread */
}
free(tinfo);
exit(EXIT_SUCCESS);
}
1.2 线程属性¶
2 锁¶
2.1 互斥量¶
- 互斥量:
pthread_mutex_t
- 相关函数
int pthread_mutex_init(pthread_mutex_t *restrict mutex,const pthread_mutexattr_t *restrict attr);
:初始化互斥量,attr可以为NULLint pthread_mutex_lock(pthread_mutex_t *mutex);
:加锁int pthread_mutex_trylock(pthread_mutex_t *mutex);
:尝试加锁int thread_mutex_unlock(pthread_mutex_t *mutex);
:解锁int pthread_mutex_destroy(pthread_mutex_t *mutex);
:销毁互斥量
2.2 自旋锁¶
- 特点:等待锁时,处于忙等待状态,耗费cpu,适合在锁占用时间特别短的场景下使用
- 自旋锁:
pthread_spinlock_t
- 相关函数
int pthread_spin_init(pthread_spinlock_t *lock, int pshared);
:初始化自旋锁int pthread_spin_lock(pthread_spinlock_t *lock);
:加自旋锁int pthread_spin_trylock(pthread_spinlock_t *lock);
:尝试加自旋锁int pthread_spin_unlock(pthread_spinlock_t *lock);
:解自旋锁int pthread_spin_destroy(pthread_spinlock_t *lock);
:销毁自旋锁
2.3 条件变量¶
pthread_cond_wait 需要传入一个已经加锁的mutex,当阻塞时,mutex又被解锁。如果接收到信号,mutex又被加锁,所以pthread_cond_wait之后还需要解锁。
pthread_cond_signal 执行前也需要先先加锁,执行完再解锁
- 条件变量:
pthread_cond_t
- 相关函数
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);
:初始化条件变量,sttr可为NULLint pthread_cond_timedwait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex,const struct timespec *restrict abstime);
:等待指定的时间,无论是否被唤醒都将返回int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
:阻塞等待被唤醒int pthread_cond_broadcast(pthread_cond_t *cond);
:唤醒所有阻塞在此cond上的线程int pthread_cond_signal(pthread_cond_t *cond);
:唤醒一个阻塞在此cond上的线程int pthread_cond_destroy(pthread_cond_t *cond);
:销毁条件变量- pthread_cond_wait 示例
pthread_mutex_lock(&mut);
while (!buffer_is_set())
pthread_cond_wait(&cond, &mut);
consume_buffer();
pthread_mutex_unlock(&mut);
- pthread_cond_signal 伪代码
pthread_lock(&mut);
setting_buffer(); /* Now buffer_is_set() will return true */
pthread_cond_signal(&cond);
pthread_unlock(&mut);
2.4 读写锁¶
https://www.cnblogs.com/dins/p/pthread-rwlock-t.html
- 读写锁:pthread_rwlock_t
- 相关函数
- int pthread_rwlock_init(pthread_rwlock_t * rwlock, const pthread_rwlockattr_t * attr);
:初始化读写锁
- int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
:加读锁
- int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
:加写锁
- int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
:解读写锁
- int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
:销毁读写锁