c++ 线程¶
线程可以总结成 2 部分,一个是如何启动线程,另一个是如何保证线程安全
1 启动线程¶
1.1 thread¶
- 头文件:
#include<thread>
thread 是一个类,构造 thread 对象时传入函数以及参数,thread 会启动一个线程
#include<thread>
#include<iostream>
using namespace std;
int add(int x, int y)
{
cout << "x + y = " << x + y << endl;
return x + y;
}
int main()
{
thread t(add, 2, 3);
t.join();
}
thread 类有几个重要的成员函数
- thread::join()
:阻塞等待线程执行完成
- thread::detach
:卸离线程到后台执行(主线程无法控制线程了)
- thread::get_id
:获取线程 id
- thread::joinable()
:判断 thread 是否有一个关联线程(d 当 thread 对象由默认构造创建时就是一个没有关联线程的 thread,这也是没意义的)
1.2 future¶
- 头文件:
#include<future>
- 来源:
async
packaged_task
promise
c++11不仅增加了thread还增加了future,这使得可以获得线程执行结果(即线程函数返回值),而这在thread中是无法直接获取返回值的(在以前的方案中往往把需要的结果以参数形式传给函数)。future由async或packaged_task产生。
- future 有以下几个重要函数
future::get()
:获取 future 保有的线程函数返回值 (阻塞直到线程执行完成)future::wait()
:阻塞直到线程完成future::wait_for()
:阻塞一段时间,但并不保证线程能够执行完future::wait_until()
:阻塞直到某一时间,但并不保证线程能够执行完future::valid()
判断 future 是否持有线程函数状态future::share()
:产生一个shared_future
,并使当前future
失效
shared_future
与 future
不同在于,future
只能使用一次 get
函数就会使 future
持有的状态失效(即再次调用 get 将会失败),调用 valid
会返回 false
; 而 shared_future
能够多次使用 get
来获取线程函数执行完返回值。
#include<future>
#include<iostream>
using namespace std;
int add(int x, int y)
{
cout << "x + y = " << x + y << endl;
return x + y;
}
int main()
{
future<int> fi(async(add, 2, 3));
int iv=fi.get();
}
1.3 async¶
async
是一个函数,用于启动一个线程并返回一个 future
,相当于把线程的执行权交给了 future
- 示例
#include <algorithm> #include <future> #include <iostream> #include <mutex> #include <numeric> #include <string> #include <vector> std::mutex m; struct X { void foo(int i, const std::string& str) { std::lock_guard<std::mutex> lk(m); std::cout << str << ' ' << i << '\n'; } void bar(const std::string& str) { std::lock_guard<std::mutex> lk(m); std::cout << str << '\n'; } int operator()(int i) { std::lock_guard<std::mutex> lk(m); std::cout << i << '\n'; return i + 10; } }; template<typename RandomIt> int parallel_sum(RandomIt beg, RandomIt end) { auto len = end - beg; if (len < 1000) return std::accumulate(beg, end, 0); RandomIt mid = beg + len / 2; auto handle = std::async(std::launch::async, parallel_sum<RandomIt>, mid, end); int sum = parallel_sum(beg, mid); return sum + handle.get(); } int main() { std::vector<int> v(10000, 1); std::cout << "和为 " << parallel_sum(v.begin(), v.end()) << '\n'; X x; // 以默认策略调用 x.foo(42, "Hello") : // 可能同时打印 "Hello 42" 或延迟执行 auto a1 = std::async(&X::foo, &x, 42, "Hello"); // 以 deferred 策略调用 x.bar("world!") // 调用 a2.get() 或 a2.wait() 时打印 "world!" auto a2 = std::async(std::launch::deferred, &X::bar, x, "world!"); // 以 async 策略调用 X()(43) : // 同时打印 "43" auto a3 = std::async(std::launch::async, X(), 43); a2.wait(); // 打印 "world!" std::cout << a3.get() << '\n'; // 打印 "53" } // 若 a1 在此点未完成,则 a1 的析构函数在此打印 "Hello 42"
- 可能输出
和为 10000 43 world! 53 Hello 42
1.4 packaged_task¶
packaged_task
是一个类,一般用于线程池中,它并不启动一个线程,但他会返回一个 future
,packaged_task
内部实现了 ()
运算符,一般在另一个线程中去执行,在另一线程调用 packaged_task::get_future()
返回的 future
对象去获取线程 packaged_task()
(函数对象) 执行结果
- 示例
#include <iostream> #include <cmath> #include <thread> #include <future> #include <functional> // 避免对 std::pow 重载集消歧义的独有函数 int f(int x, int y) { return std::pow(x,y); } void task_lambda() { std::packaged_task<int(int,int)> task([](int a, int b) { return std::pow(a, b); }); std::future<int> result = task.get_future(); task(2, 9); std::cout << "task_lambda:\t" << result.get() << '\n'; } void task_bind() { std::packaged_task<int()> task(std::bind(f, 2, 11)); std::future<int> result = task.get_future(); task(); std::cout << "task_bind:\t" << result.get() << '\n'; } void task_thread() { std::packaged_task<int(int,int)> task(f); std::future<int> result = task.get_future(); std::thread task_td(std::move(task), 2, 10); task_td.join(); std::cout << "task_thread:\t" << result.get() << '\n'; } int main() { task_lambda(); task_bind(); task_thread(); }
- 输出
task_lambda: 512 task_bind: 2048 task_thread: 1024
1.5 promise¶
类模板 std::promise
提供存储值或异常的设施,之后通过 std::promise
对象所创建的 std::future 对象异步获得结果。注意 std::promise
只应当使用一次。·
- 示例
#include <vector>
#include <thread>
#include <future>
#include <numeric>
#include <iostream>
#include <chrono>
void accumulate(std::vector<int>::iterator first,
std::vector<int>::iterator last,
std::promise<int> accumulate_promise)
{
int sum = std::accumulate(first, last, 0);
accumulate_promise.set_value(sum); // 提醒 future
}
void do_work(std::promise<void> barrier)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
barrier.set_value();
}
int main()
{
// 演示用 promise<int> 在线程间传递结果。
std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 };
std::promise<int> accumulate_promise;
std::future<int> accumulate_future = accumulate_promise.get_future();
std::thread work_thread(accumulate, numbers.begin(), numbers.end(),
std::move(accumulate_promise));
// future::get() 将等待直至该 future 拥有合法结果并取得它
// 无需在 get() 前调用 wait()
//accumulate_future.wait(); // 等待结果
std::cout << "result=" << accumulate_future.get() << '\n';
work_thread.join(); // wait for thread completion
// 演示用 promise<void> 在线程间对状态发信号
std::promise<void> barrier;
std::future<void> barrier_future = barrier.get_future();
std::thread new_work_thread(do_work, std::move(barrier));
barrier_future.wait();
new_work_thread.join();
}
result=21
1.6 this_thread¶
命名空间 this_thread
提供了一些便于操作线程的函数
- this_thread::get_id()
- this_thread::sleep_for(dur)
- this_thread::sleep_until(tp)
- this_thread::yield()
:放弃当前线程的时间片(相当于告诉 cpu 去执行下一线程,我这不执行),用于轮询等待某一任务完成时能发挥重要作用
while(!readyFlag)
{
this_thread::yield();
}
1.7 总结¶
- 真正可以启动一个线程的是
async
(函数)和thread
(类) future
使获取线程函数执行结果更方便packaged_task
一般用于线程池实现
2 atomic¶
atomic
用来处理原子操作,比如对一个数据读写,就可以使用,或者作为一个条件在某些场合可以取代 condition_variable
。
#include<atomic>
#include<iostream>
using namespace std;
int main()
{
//atomic
{
atomic<bool> b(false);
b.store(true);
while (b.load())
{
cout << "hello world" << endl;
}
}
}
2.1 compare_exchange_weak 和 compare_exchange_strong¶
方法签名 | 序号 | 标准支持 |
---|---|---|
bool compare_exchange_weak( T& expected, T desired, std::memory_order) success, std::memory_order failure ) noexcept; |
(1) | (C++11 起) |
bool compare_exchange_weak( T& expected, T desired, std::memory_order success, std::memory_order failure ) volatile noexcept; |
(2) | (C++11 起) |
bool compare_exchange_weak( T& expected, T desired, std::memory_order order = std::memory_order_seq_cst ) noexcept; |
(3) | (C++11 起) |
bool compare_exchange_weak( T& expected, T desired, std::memory_order order = std::memory_order_seq_cst ) volatile noexcept; |
(4) | (C++11 起) |
bool compare_exchange_strong( T& expected, T desired, std::memory_order success, std::memory_order failure ) noexcept; |
(5) | (C++11 起) |
bool compare_exchange_strong( T& expected, T desired, std::memory_order success, std::memory_order failure ) volatile noexcept; |
(6) | (C++11 起) |
bool compare_exchange_strong( T& expected, T desired, std::memory_order order = std::memory_order_seq_cst ) noexcept; |
(7) | (C++11 起) |
bool compare_exchange_strong( T& expected, T desired, std::memory_order order = std::memory_order_seq_cst ) volatile noexcept; |
(8) | (C++11 起) |
以上方法参数解释如下: |
expected
:期待的值,如果和实际值不相等,那么返回 false,并且expected
被设置成实际值。(注意此为引用,因此可以被用来设置成实际的值。)desired
:待修改的值,当expected
等于实际值时,那么实际值就被更新成desired
的值。success
:读修改写操作所用的内存同步顺序。failure
:加载操作所用的内存同步顺序。order
:两个操作所用的内存同步顺序。 返回值:更新成功返回 true,失败返回 false。compare_exchange_weak
和compare_exchange_strong
最大的区别时:compare_exchange_weak
可能会虚假地失败,即比较相等但还是返回 false。compare_exchange_weak
性能比compare_exchange_strong
好。
2.2 is_lock_free¶
检查此类型所有对象上的原子操作是否免锁。一般结构体大小大于 8 不免锁。
#include <iostream>
#include <atomic>
struct A{
int a;
int b;
};
struct B{
int a;
double b;
};
int main() {
std::atomic<int> atomicInt;
std::atomic<double> atomicDouble;
std::atomic<A> atomicA;
std::atomic<B> atomicB;
bool intLockFree = atomicInt.is_lock_free();
bool doubleLockFree = atomicDouble.is_lock_free();
bool aLockFree = atomicA.is_lock_free();
bool bLockFree = atomicB.is_lock_free();
std::cout << "int is lock-free: " << intLockFree << std::endl;
std::cout << "double is lock-free: " << doubleLockFree << std::endl;
std::cout << "A is lock-free: " << aLockFree << std::endl;
std::cout << "B is lock-free: " << bLockFree << std::endl;
return 0;
}
输出结果:
int is lock-free: 1
double is lock-free: 1
A is lock-free: 1
B is lock-free: 0
2.3 load、store、exchange¶
T load( std::memory_order order = std:: memory_order_seq_cst ) const noexcept;
:原子地获得原子对象的值。void store( T desired, std::memory_order order = std:: memory_order_seq_cst ) noexcept;
:原子地以非原子对象替换原子对象的值。T exchange( T desired, std::memory_order order = std:: memory_order_seq_cst ) noexcept;
:原子地替换原子对象的值并获得它先前持有的值。
store 和 exchange 不同在于 exchange 会返回修改前的值。
2.4 其它接口¶
方法签名 | 解释 |
---|---|
fetch_add | 原子地将参数加到存储于原子对象的值,并返回先前保有的值 (公开成员函数) |
fetch_sub | 原子地从存储于原子对象的值减去参数,并获得先前保有的值 (公开成员函数) |
fetch_and | 原子地进行参数和原子对象的值的逐位与,并获得先前保有的值 (公开成员函数) |
fetch_or | 原子地进行参数和原子对象的值的逐位或,并获得先前保有的值 (公开成员函数) |
fetch_xor | 原子地进行参数和原子对象的值的逐位异或,并获得先前保有的值 (公开成员函数) |
operator++ operator++(int) operator-- operator--(int) |
令原子值增加或减少一 (公开成员函数) |
operator+= operator-= operator&= operator\|= operator^= |
加、减,或与原子值进行逐位与、或、异或 (公开成员函数) |
2.5 memory_order¶
memory_order_relaxed
:- 最轻量级的内存顺序要求。
- 不对操作进行任何排序,也不提供任何同步。
-
适用于不依赖于操作顺序的独立读取和写入操作。
-
memory_order_consume
: - 用于指示数据依赖关系,通常用于读取依赖于之前写入的数据。
-
并没有广泛支持,因此不太常用。
-
memory_order_acquire
: - 用于读取操作,确保在此操作之后的读取不会被重排序到这个操作之前。
-
用于实现读取依赖,确保读取到最新的数据。
-
memory_order_release
: - 用于写入操作,确保在此操作之前的写入不会被重排序到这个操作之后。
-
用于实现写入依赖,确保写入的数据对其他线程可见。
-
memory_order_acq_rel
: -
用于既包含读取又包含写入操作的情况,提供了一种更强的同步保证。
-
memory_order_seq_cst
: - 最强的内存顺序要求。
- 对操作进行全局排序,确保所有线程都能看到相同的操作顺序。
- 适用于需要强一致性保证的情况,但可能在性能上有一定开销。
2.6 示例¶
#include <atomic>
template<typename T>
struct node
{
T data;
node* next;
node(const T& data) : data(data), next(nullptr) {}
};
template<typename T>
class stack
{
std::atomic<node<T>*> head;
public:
void push(const T& data)
{
node<T>* new_node = new node<T>(data);
// 将 head 的当前值放到 new_node->next 中
new_node->next = head.load(std::memory_order_relaxed);
// 现在令 new_node 为新的 head ,但如果 head 不再是
// 存储于 new_node->next 的值(某些其他线程必须在刚才插入结点)
// 那么将新的 head 放到 new_node->next 中并再尝试
while(!head.compare_exchange_weak(new_node->next, new_node,
std::memory_order_release,
std::memory_order_relaxed))
; // 循环体为空
// 注意:上述使用至少在这些版本不是线程安全的
// 先于 4.8.3 的 GCC(漏洞 60272),先于 2014-05-05 的 clang(漏洞 18899)
// 先于 2014-03-17 的 MSVC(漏洞 819819)。下面是变通方法:
// node<T>* old_head = head.load(std::memory_order_relaxed);
// do
// {
// new_node->next = old_head;
// }
// while (!head.compare_exchange_weak(old_head, new_node,
// std::memory_order_release,
// std::memory_order_relaxed));
}
};
int main()
{
stack<int> s;
s.push(1);
s.push(2);
s.push(3);
}
#include <atomic>
#include <iostream>
std::atomic<int> ai;
int tst_val = 4;
int new_val = 5;
bool exchanged= false;
void valsout()
{
std::cout << "ai = " << ai
<< " tst_val = " << tst_val
<< " new_val = " << new_val
<< " exchanged = " << std::boolalpha << exchanged
<< "\n";
}
int main()
{
ai= 3;
valsout();
// tst_val != ai ==> tst_val 被修改
exchanged= ai.compare_exchange_strong(tst_val, new_val);
valsout();
// tst_val == ai ==> ai 被修改
exchanged= ai.compare_exchange_strong(tst_val, new_val);
valsout();
}
3 线程锁¶
3.1 mutex 和 lock¶
mutex
:互斥量可以直接锁住,但通常会通过 lock_guard 来锁recursive_mutex
: 同mutex
,一般使用循环中,如一个函数调用另一个函数,这 2 个函数都持有相同的 metux,这是recursive_mutex
就能避免死循环问题lock_guard
:持有 mutex,锁住并自动释放锁unique_lock
: 同lock_guard
,但多了些其它控制函数,比如可以主动调用unique_lock::unlock()
解锁
#include<mutex>
#include<iostream>
using namespace std;
int add(int x, int y)
{
cout << "x + y = " << x + y << endl;
return x + y;
}
int main()
{
//metux
{
mutex mt;
{
mt.lock();
add(2, 3);
mt.unlock();
}
{
lock_guard<mutex> lg(mt);
add(2, 3);
}
{
unique_lock<mutex> ulg(mt);
add(2, 3);
ulg.unlock();
}
}
}
3.2 condition_variable¶
condition_variable
条件变量,顾名思义就是用来当达到某一条件时才去执行某一操作。
- condition_variable
与 unique_lock
一同使用
- condition_variable
有几个重要函数
- condition_variable::notify_one()
:唤醒一个等待线程
- condition_variable::notify_notify_all
:唤醒所有等待线程
- condition_variable::wait(unique_lock,bool);
:等待条件满足
- 示例如下
#include<future>
#include<thread>
#include<condition_variable>
#include<mutex>
#include<iostream>
using namespace std;
int main()
{
//condition_variable
{
mutex mt;
bool b = false;
condition_variable cv;
future<void> fv = async([&cv,&mt,&b] {
unique_lock<mutex> ug(mt);
cv.wait(ug, [&b] {
return b;
});
cout << "hello world!" << endl;
});
thread t([&cv, &mt, &b] {
unique_lock<mutex> ug(mt);
cv.notify_one();
b = true;
cout << "notify" << endl;
});
fv.get();
t.join();
}
}
输出结果
notify
hello world!
4 线程池¶
- 实现代码
#include<functional>
#include<type_traits>
#include<memory>
#include<mutex>
#include<future>
#include<atomic>
#include<condition_variable>
#include<thread>
#include<queue>
#include<vector>
class ThreadPool
{
private:
using task = std::function<void()>;
private:
int m_tNum;//线程池大小
int m_tunusedNum;//可用线程数
std::queue<task> m_tasks;//任务队列
std::vector<std::thread> m_threads;//线程容器
std::mutex m_m;
std::condition_variable m_cv;
std::atomic<bool> m_stop{ false };
public:
ThreadPool(const unsigned int& sz);
~ThreadPool();
template<typename F, typename ...Args>
auto addTask(F&& f, Args&& ...args)->std::future<typename std::result_of_t<F(Args...)>>;
private:
void createThreadPool(const unsigned int& sz);
};
ThreadPool::ThreadPool(const unsigned int& sz) :m_tNum(sz)
{
//创建线程
createThreadPool(sz);
}
ThreadPool::~ThreadPool()
{
m_stop.store(true);
m_cv.notify_all();
for (auto& t : m_threads)
{
if (t.joinable())
t.join();
}
}
void ThreadPool::createThreadPool(const unsigned int& sz)
{
for (size_t i = 0; i < sz; ++i)
{
m_threads.emplace_back([this]() {
while (true)
{
task t;
{
std::unique_lock<std::mutex> lg(m_m);
//激活条件:1.线程池不再使用。2.任务队列不为空。
m_cv.wait(lg, [this]()->bool {
return m_stop.load() || !m_tasks.empty();
});
//线程池不再使用,终结此线程
if (m_stop.load())
return;
//从任务队列取出任务,并执行
t = m_tasks.front();
m_tasks.pop();
}
//执行任务
t();
}
});
}
}
template<typename F, typename ...Args>
auto ThreadPool::addTask(F&& f, Args&& ...args)->std::future<typename std::result_of_t<F(Args...)>>
{
using RET_TYPE = typename std::result_of_t<F(Args...)>;
//创建智能指针保存 packaged_task类型对象,packaged_task持有ET_TYPE()函数
auto sh_task = std::make_shared<std::packaged_task<RET_TYPE()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
//将新任务放入任务队列中
{
std::unique_lock<std::mutex> lg(m_m);
//将任务放入void()函数中,再放入任务队列中
m_tasks.emplace([sh_task] {
(*sh_task)();
});
}
//唤醒一个线程
m_cv.notify_one();
return sh_task->get_future();
}
- 测试代码
#include<iostream>
using namespace std;
int main() {
ThreadPool pool(5);
vector<future<int>> res;
for (size_t i = 0; i < 5; i++)
{
res.emplace_back(pool.addTask([](int j) {
printf("函数%d,线程ID %d\n",j,this_thread::get_id());
return j*j;
},i));
}
for (auto & elm:res)
{
cout << elm.get() << endl;
}
}
5 “惊群“现象¶
当多个线程等待一个资源时,如果使用的是 notify_all 方式,所有阻塞的线程都要被唤醒,但同时只能一个线程获取资源。这种现象会导致 cpu 资源无效浪费。
6 一些锁的封装¶
6.1 嵌套锁¶
- 解释:在同一个线程中可以多次调用锁而不会陷入死锁。
6.1.1 实现原理¶
额外增加 2 个变量,分别记录当前加锁的 线程号 和 加锁次数。加锁时 判断是否是已加锁线程,如果是则为嵌套加锁,更新加锁次数即可;如果不是,则调用系统加锁接口(如 pthread_mutex_lock),同时更新加锁次数为 1。解锁时,加锁次数减 1,同时判断加锁次数是否等于 0,如果等于 0 可以调用系统接口(如 pthread_mutex_unlock)释放锁,同时线程号重置为 0;如果大于 0,不用解锁。
6.1.2 源码¶
#include <pthread.h>
// 全局锁(抽象类),一般用于锁定系统级资源,使用场景,比如:初始化
class LockBase {
public:
LockBase(){};
virtual ~LockBase() = 0;
virtual int Lock(){};
virtual int TimedLock(int time_sec){};
virtual int UnLock(){};
};
// 全局嵌套锁,可以在调用链上多次加锁而不会导致死锁
class NestLock : public LockBase {
private:
pthread_t tid_;
int lock_time_;
pthread_mutex_t lock_;
public:
NestLock();
~NestLock();
virtual int Lock();
virtual int TimedLock(int time_sec);
virtual int UnLock();
};
LockBase::~LockBase() {}
NestLock::NestLock() {
tid_ = 0;
lock_time_ = 0;
pthread_mutex_init(&lock_, NULL);
}
NestLock::~NestLock() { pthread_mutex_destroy(&lock_); }
int NestLock::Lock() {
if (pthread_self() != tid_) {
pthread_mutex_lock(&lock_);
tid_ = pthread_self();
lock_time_ = 1;
} else {
// 线程解锁后tid_重新设置成0了,不会出现下面的情况,如果线程号有可能是0,下面的逻辑是必须的
// if (lock_time_ == 0){
// pthread_mutex_lock(&lock_);
// }
++lock_time_;
}
return 0;
}
int NestLock::TimedLock(int time_sec) {
if (pthread_self() != tid_) {
struct timespec time_out;
clock_gettime(CLOCK_REALTIME, &time_out);
time_out.tv_sec += time_sec;
pthread_mutex_timedlock(&lock_, &time_out);
tid_ = pthread_self();
lock_time_ = 1;
} else {
// 线程解锁后tid_重新设置成0了,不会出现下面的情况,如果线程号有可能是0,下面的逻辑是必须的
// if (lock_time_ == 0){
// pthread_mutex_lock(&lock_);
// }
++lock_time_;
}
return 0;
}
int NestLock::UnLock() {
--lock_time_;
if (lock_time_ == 0) {
tid_ = 0;
pthread_mutex_unlock(&lock_);
}
}
6.2 顺序锁¶
- 解释:按照调用 lock 时的顺序获取锁。
6.2.1 实现原理¶
增加额外 2 个变量next_seq_num_
和 tail_seq_num_
,next_seq_num_
一个用来记录下一个 能抢占锁的序号,tail_seq_num_
用来给加锁线程 分配序号。加锁时,按照调用加锁接口的顺序给线程分配一个序号, 此时 tail_seq_num_
+1,当线程抢占到锁后,通过条件判断线程持有的序号是否等于 next_seq_num_
,如果相等,那么加锁成功;否则加锁失败,继续等待被唤醒。解锁时,将 next_seq_num_
+1,解锁,然后唤醒所有阻塞线程。
6.2.2 源码¶
//filename: SequenceLock.hpp
#include <atomic>
#include <chrono>
#include <condition_variable>
//#include <iostream>
#include <mutex>
//#include <thread>
class LockBase
{
public:
LockBase(){};
virtual ~LockBase() = 0;
virtual int Lock(){};
virtual int TimedLock(int time_sec){};
virtual int UnLock(){};
};
LockBase::~LockBase(){};
class SequenceLock : public LockBase
{
public:
int next_seq_num_; // 下一个能抢到锁的序列号
std::atomic_int tail_seq_num_;
std::mutex mtx_;
std::condition_variable cv_;
public:
SequenceLock();
~SequenceLock();
virtual int Lock();
virtual int TimedLock(int time_sec);
virtual int UnLock();
};
SequenceLock::SequenceLock()
{
next_seq_num_ = 1;
tail_seq_num_ = 0;
}
SequenceLock::~SequenceLock() {}
int SequenceLock::Lock()
{
int cur_seq = ++tail_seq_num_;
std::unique_lock<std::mutex> lg(mtx_);
cv_.wait(lg, [cur_seq, this]() { return cur_seq == this->next_seq_num_; });
//std::cout << "locked cur_seq: " << cur_seq << std::endl;
lg.release();
return 0;
}
int SequenceLock::TimedLock(int time_sec)
{
int cur_seq = ++tail_seq_num_;
std::unique_lock<std::mutex> lg(mtx_);
if (cv_.wait_for(lg, std::chrono::seconds(time_sec),
[cur_seq, this]()
{ return cur_seq == this->next_seq_num_; }))
{
//std::cout << "locked cur_seq: " << cur_seq << std::endl;
lg.release();
return 0;
}
else
{
return -1;
}
}
int SequenceLock::UnLock()
{
++next_seq_num_;
mtx_.unlock();
cv_.notify_all();
return 0;
}
- 测试代码
#include "SequenceLock.hpp"
void fun(SequenceLock* sl)
{
for (size_t i = 0; i < 50; i++)
{
sl->Lock();
std::this_thread::sleep_for(std::chrono::milliseconds(10));
sl->UnLock();
}
}
int main()
{
std::cout << "start" << std::endl;
SequenceLock sl;
std::thread t1(fun, &sl);
std::thread t2(fun, &sl);
std::thread t3(fun, &sl);
std::thread t4(fun, &sl);
t1.join();
t3.join();
t2.join();
t4.join();
std::cout << "end" << std::endl;
return 1;
}
6.2.2.1 v1.1 增加 trylock¶
增加一个 b_try_lock_ 变量记录当前 mutex 是否被 try_lock 持有。为什么增加这个变量,是因为 trylock 时不增加序号,unlock 时也就不需要给 next_seq_num_+1。
// filename: sequencelock.cc
// compiler: g++ -g --std=c++11 -pthread sequencelock.cc -o sequencelock
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
class LockBase
{
public:
LockBase(){};
virtual ~LockBase() = 0;
virtual int Lock(){};
virtual int TryLock(){};
virtual int TimedLock(int time_sec){};
virtual int UnLock(){};
};
LockBase::~LockBase(){};
class SequenceLock : public LockBase
{
private:
volatile bool b_try_lock_;
int next_seq_num_; // 下一个能抢到锁的序列号
std::atomic_int tail_seq_num_;
std::mutex mtx_;
std::condition_variable cv_;
public:
SequenceLock();
~SequenceLock();
virtual int Lock();
virtual int TryLock();
virtual int TimedLock(int time_sec);
virtual int UnLock();
};
SequenceLock::SequenceLock()
{
b_try_lock_ = false;
next_seq_num_ = 1;
tail_seq_num_ = 0;
}
SequenceLock::~SequenceLock() {}
int SequenceLock::Lock()
{
int cur_seq = ++tail_seq_num_;
std::unique_lock<std::mutex> lg(mtx_);
cv_.wait(lg, [cur_seq, this]() { return cur_seq == this->next_seq_num_; });
//std::cout << "locked cur_seq: " << cur_seq << std::endl;
lg.release();
return 0;
}
int SequenceLock::TryLock()
{
std::unique_lock<std::mutex> lg(mtx_, std::defer_lock);
if (lg.try_lock())
{
b_try_lock_ = true;
//std::cout << "TryLock next_seq_num_: " << next_seq_num_ << std::endl;
lg.release();
return 0;
}
else
{
return -1;
}
}
int SequenceLock::TimedLock(int time_sec)
{
int cur_seq = ++tail_seq_num_;
std::unique_lock<std::mutex> lg(mtx_);
if (cv_.wait_for(lg, std::chrono::seconds(time_sec),
[cur_seq, this]()
{ return cur_seq == this->next_seq_num_; }))
{
// std::cout << "locked cur_seq: " << cur_seq << std::endl;
lg.release();
return 0;
}
else
{
return -1;
}
}
int SequenceLock::UnLock()
{
if (!b_try_lock_)
{
++next_seq_num_;
}
else
{
b_try_lock_ = false;
}
mtx_.unlock();
cv_.notify_all();
return 0;
}
void fun(SequenceLock* sl)
{
for (size_t i = 0; i < 10; i++)
{
sl->Lock();
std::this_thread::sleep_for(std::chrono::milliseconds(10));
sl->UnLock();
}
}
void tryfun(SequenceLock* sl)
{
for (size_t i = 0; i < 50; i++)
{
if (sl->TryLock() == 0)
{
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::cout << "TryLock SUCCESS " << std::endl;
sl->UnLock();
}
else
{
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::cout << "TryLock failed " << std::endl;
}
}
}
int main()
{
std::cout << "start" << std::endl;
SequenceLock sl;
std::thread t1(fun, &sl);
// std::thread t2(fun, &sl);
// std::thread t3(fun, &sl);
// std::thread t4(fun, &sl);
std::thread t5(tryfun, &sl);
t5.join();
t1.join();
// t3.join();
// t2.join();
// t4.join();
std::cout << "end" << std::endl;
return 1;
}
6.2.3 疑点分析¶
为什么不用
notify_one()
? 使用 notify_one 可能会导致线程一直阻塞在 wait 上。 为什么调用lg.release()
? 因为 unique_lock 会自动释放锁,但我们不需要释放锁,这时使用lg.release()
就将 unique_lock 和 mutex 分开了,就不会自动释放锁了。