跳转至

c++ 线程

线程可以总结成 2 部分,一个是如何启动线程,另一个是如何保证线程安全

1 启动线程

1.1 thread

  • 头文件:#include<thread>

thread 是一个类,构造 thread 对象时传入函数以及参数,thread 会启动一个线程

#include<thread>
#include<iostream>
using namespace std;
int add(int x, int y)
{
    cout << "x + y = " << x + y << endl;
    return x + y;
}

int main()
{
    thread t(add, 2, 3);
    t.join();
}

thread 类有几个重要的成员函数 - thread::join():阻塞等待线程执行完成 - thread::detach:卸离线程到后台执行(主线程无法控制线程了) - thread::get_id:获取线程 id - thread::joinable():判断 thread 是否有一个关联线程(d 当 thread 对象由默认构造创建时就是一个没有关联线程的 thread,这也是没意义的)

1.2 future

  • 头文件:#include<future>
  • 来源:
  • async
  • packaged_task
  • promise
c++11不仅增加了thread还增加了future,这使得可以获得线程执行结果(即线程函数返回值),而这在thread中是无法直接获取返回值的(在以前的方案中往往把需要的结果以参数形式传给函数)。future由async或packaged_task产生。
  • future 有以下几个重要函数
  • future::get():获取 future 保有的线程函数返回值 (阻塞直到线程执行完成)
  • future::wait():阻塞直到线程完成
  • future::wait_for():阻塞一段时间,但并不保证线程能够执行完
  • future::wait_until():阻塞直到某一时间,但并不保证线程能够执行完
  • future::valid() 判断 future 是否持有线程函数状态
  • future::share():产生一个 shared_future,并使当前 future 失效

shared_futurefuture 不同在于,future 只能使用一次 get 函数就会使 future 持有的状态失效(即再次调用 get 将会失败),调用 valid 会返回 false; 而 shared_future 能够多次使用 get 来获取线程函数执行完返回值。

#include<future>
#include<iostream>
using namespace std;
int add(int x, int y)
{
    cout << "x + y = " << x + y << endl;
    return x + y;
}

int main()
{
    future<int> fi(async(add, 2, 3));
    int iv=fi.get();
}

1.3 async

async 是一个函数,用于启动一个线程并返回一个 future,相当于把线程的执行权交给了 future

  • 示例
    #include <algorithm>
    #include <future>
    #include <iostream>
    #include <mutex>
    #include <numeric>
    #include <string>
    #include <vector>
    
    std::mutex m;
    
    struct X
    {
        void foo(int i, const std::string& str)
        {
            std::lock_guard<std::mutex> lk(m);
            std::cout << str << ' ' << i << '\n';
        }
    
        void bar(const std::string& str)
        {
            std::lock_guard<std::mutex> lk(m);
            std::cout << str << '\n';
        }
    
        int operator()(int i)
        {
            std::lock_guard<std::mutex> lk(m);
            std::cout << i << '\n';
            return i + 10;
        }
    };
    
    template<typename RandomIt>
    int parallel_sum(RandomIt beg, RandomIt end)
    {
        auto len = end - beg;
        if (len < 1000)
            return std::accumulate(beg, end, 0);
    
        RandomIt mid = beg + len / 2;
        auto handle = std::async(std::launch::async,
                                 parallel_sum<RandomIt>, mid, end);
        int sum = parallel_sum(beg, mid);
        return sum + handle.get();
    }
    
    int main()
    {
        std::vector<int> v(10000, 1);
        std::cout << "和为 " << parallel_sum(v.begin(), v.end()) << '\n';
    
        X x;
        // 以默认策略调用 x.foo(42, "Hello") :
        // 可能同时打印 "Hello 42" 或延迟执行
        auto a1 = std::async(&X::foo, &x, 42, "Hello");
        // 以 deferred 策略调用 x.bar("world!")
        // 调用 a2.get() 或 a2.wait() 时打印 "world!"
        auto a2 = std::async(std::launch::deferred, &X::bar, x, "world!");
        // 以 async 策略调用 X()(43) :
        // 同时打印 "43"
        auto a3 = std::async(std::launch::async, X(), 43);
        a2.wait();                     // 打印 "world!"
        std::cout << a3.get() << '\n'; // 打印 "53"
    } // 若 a1 在此点未完成,则 a1 的析构函数在此打印 "Hello 42"
    
  • 可能输出
    和为 10000
    43
    world!
    53
    Hello 42
    

1.4 packaged_task

packaged_task 是一个类,一般用于线程池中,它并不启动一个线程,但他会返回一个 future,packaged_task 内部实现了 () 运算符,一般在另一个线程中去执行,在另一线程调用 packaged_task::get_future() 返回的 future 对象去获取线程 packaged_task()(函数对象) 执行结果

  • 示例
    #include <iostream>
    #include <cmath>
    #include <thread>
    #include <future>
    #include <functional>
    
    // 避免对 std::pow 重载集消歧义的独有函数
    int f(int x, int y) { return std::pow(x,y); }
    
    void task_lambda()
    {
        std::packaged_task<int(int,int)> task([](int a, int b) {
            return std::pow(a, b); 
        });
        std::future<int> result = task.get_future();
    
        task(2, 9);
    
        std::cout << "task_lambda:\t" << result.get() << '\n';
    }
    
    void task_bind()
    {
        std::packaged_task<int()> task(std::bind(f, 2, 11));
        std::future<int> result = task.get_future();
    
        task();
    
        std::cout << "task_bind:\t" << result.get() << '\n';
    }
    
    void task_thread()
    {
        std::packaged_task<int(int,int)> task(f);
        std::future<int> result = task.get_future();
    
        std::thread task_td(std::move(task), 2, 10);
        task_td.join();
    
        std::cout << "task_thread:\t" << result.get() << '\n';
    }
    
    int main()
    {
        task_lambda();
        task_bind();
        task_thread();
    }
    
  • 输出
    task_lambda: 512
    task_bind:   2048
    task_thread: 1024
    

1.5 promise

类模板 std::promise 提供存储值或异常的设施,之后通过 std::promise 对象所创建的 std::future 对象异步获得结果。注意 std::promise 只应当使用一次。· - 示例

#include <vector>
#include <thread>
#include <future>
#include <numeric>
#include <iostream>
#include <chrono>

void accumulate(std::vector<int>::iterator first,
                std::vector<int>::iterator last,
                std::promise<int> accumulate_promise)
{
    int sum = std::accumulate(first, last, 0);
    accumulate_promise.set_value(sum);  // 提醒 future
}

void do_work(std::promise<void> barrier)
{
    std::this_thread::sleep_for(std::chrono::seconds(1));
    barrier.set_value();
}

int main()
{
    // 演示用 promise<int> 在线程间传递结果。
    std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 };
    std::promise<int> accumulate_promise;
    std::future<int> accumulate_future = accumulate_promise.get_future();
    std::thread work_thread(accumulate, numbers.begin(), numbers.end(),
                            std::move(accumulate_promise));

    // future::get() 将等待直至该 future 拥有合法结果并取得它
    // 无需在 get() 前调用 wait()
    //accumulate_future.wait();  // 等待结果
    std::cout << "result=" << accumulate_future.get() << '\n';
    work_thread.join();  // wait for thread completion

    // 演示用 promise<void> 在线程间对状态发信号
    std::promise<void> barrier;
    std::future<void> barrier_future = barrier.get_future();
    std::thread new_work_thread(do_work, std::move(barrier));
    barrier_future.wait();
    new_work_thread.join();
}
- 输出
result=21

1.6 this_thread

命名空间 this_thread 提供了一些便于操作线程的函数 - this_thread::get_id() - this_thread::sleep_for(dur) - this_thread::sleep_until(tp) - this_thread::yield():放弃当前线程的时间片(相当于告诉 cpu 去执行下一线程,我这不执行),用于轮询等待某一任务完成时能发挥重要作用

while(!readyFlag)
{
    this_thread::yield();
}

1.7 总结

  • 真正可以启动一个线程的是 async(函数)和 thread(类)
  • future 使获取线程函数执行结果更方便
  • packaged_task 一般用于线程池实现

2 atomic

atomic 用来处理原子操作,比如对一个数据读写,就可以使用,或者作为一个条件在某些场合可以取代 condition_variable

#include<atomic>
#include<iostream>
using namespace std;
int main()
{
    //atomic
    {
        atomic<bool> b(false);
        b.store(true);
        while (b.load())
        {
            cout << "hello world" << endl;
        }
    }
}

2.1 compare_exchange_weak 和 compare_exchange_strong

方法签名 序号 标准支持
bool compare_exchange_weak( T& expected, T desired, std::memory_order) success, std::memory_order failure ) noexcept; (1) (C++11 起)
bool compare_exchange_weak( T& expected, T desired, std::memory_order success, std::memory_order failure ) volatile noexcept; (2) (C++11 起)
bool compare_exchange_weak( T& expected, T desired, std::memory_order order = std::memory_order_seq_cst ) noexcept; (3) (C++11 起)
bool compare_exchange_weak( T& expected, T desired, std::memory_order order = std::memory_order_seq_cst ) volatile noexcept; (4) (C++11 起)
bool compare_exchange_strong( T& expected, T desired, std::memory_order success, std::memory_order failure ) noexcept; (5) (C++11 起)
bool compare_exchange_strong( T& expected, T desired, std::memory_order success, std::memory_order failure ) volatile noexcept; (6) (C++11 起)
bool compare_exchange_strong( T& expected, T desired, std::memory_order order = std::memory_order_seq_cst ) noexcept; (7) (C++11 起)
bool compare_exchange_strong( T& expected, T desired, std::memory_order order = std::memory_order_seq_cst ) volatile noexcept; (8) (C++11 起)
以上方法参数解释如下:
  • expected:期待的值,如果和实际值不相等,那么返回 false,并且 expected 被设置成实际值。(注意此为引用,因此可以被用来设置成实际的值。)
  • desired:待修改的值,当 expected 等于实际值时,那么实际值就被更新成 desired 的值。
  • success:读修改写操作所用的内存同步顺序。
  • failure:加载操作所用的内存同步顺序。
  • order:两个操作所用的内存同步顺序。 返回值:更新成功返回 true,失败返回 false。 compare_exchange_weak 和 compare_exchange_strong 最大的区别时:
  • compare_exchange_weak 可能会虚假地失败,即比较相等但还是返回 false。
  • compare_exchange_weak 性能比 compare_exchange_strong 好。

2.2 is_lock_free

检查此类型所有对象上的原子操作是否免锁。一般结构体大小大于 8 不免锁。

#include <iostream>
#include <atomic>
struct A{
    int a;
    int b;
};
struct B{
    int a;
    double b;
};
int main() {
    std::atomic<int> atomicInt;
    std::atomic<double> atomicDouble;
    std::atomic<A> atomicA;
    std::atomic<B> atomicB;

    bool intLockFree = atomicInt.is_lock_free();
    bool doubleLockFree = atomicDouble.is_lock_free();
    bool aLockFree = atomicA.is_lock_free();
    bool bLockFree = atomicB.is_lock_free();

    std::cout << "int is lock-free: " << intLockFree << std::endl;
    std::cout << "double is lock-free: " << doubleLockFree << std::endl;
    std::cout << "A is lock-free: " << aLockFree << std::endl;
    std::cout << "B is lock-free: " << bLockFree << std::endl;

    return 0;
}

输出结果:

int is lock-free: 1 
double is lock-free: 1 
A is lock-free: 1 
B is lock-free: 0

2.3 load、store、exchange

  • T load( std::memory_order order = std:: memory_order_seq_cst ) const noexcept;:原子地获得原子对象的值。
  • void store( T desired, std::memory_order order = std:: memory_order_seq_cst ) noexcept; :原子地以非原子对象替换原子对象的值。
  • T exchange( T desired, std::memory_order order = std:: memory_order_seq_cst ) noexcept;:原子地替换原子对象的值并获得它先前持有的值。

store 和 exchange 不同在于 exchange 会返回修改前的值。

2.4 其它接口

方法签名 解释
fetch_add 原子地将参数加到存储于原子对象的值,并返回先前保有的值 (公开成员函数)
fetch_sub 原子地从存储于原子对象的值减去参数,并获得先前保有的值 (公开成员函数)
fetch_and 原子地进行参数和原子对象的值的逐位与,并获得先前保有的值 (公开成员函数)
fetch_or 原子地进行参数和原子对象的值的逐位或,并获得先前保有的值 (公开成员函数)
fetch_xor 原子地进行参数和原子对象的值的逐位异或,并获得先前保有的值 (公开成员函数)
operator++ operator++(int) operator-- operator--(int) 令原子值增加或减少一 (公开成员函数)
operator+= operator-= operator&= operator\|= operator^= 加、减,或与原子值进行逐位与、或、异或 (公开成员函数)

2.5 memory_order

  1. memory_order_relaxed
  2. 最轻量级的内存顺序要求。
  3. 不对操作进行任何排序,也不提供任何同步。
  4. 适用于不依赖于操作顺序的独立读取和写入操作。

  5. memory_order_consume

  6. 用于指示数据依赖关系,通常用于读取依赖于之前写入的数据。
  7. 并没有广泛支持,因此不太常用。

  8. memory_order_acquire

  9. 用于读取操作,确保在此操作之后的读取不会被重排序到这个操作之前。
  10. 用于实现读取依赖,确保读取到最新的数据。

  11. memory_order_release

  12. 用于写入操作,确保在此操作之前的写入不会被重排序到这个操作之后。
  13. 用于实现写入依赖,确保写入的数据对其他线程可见。

  14. memory_order_acq_rel

  15. 用于既包含读取又包含写入操作的情况,提供了一种更强的同步保证。

  16. memory_order_seq_cst

  17. 最强的内存顺序要求。
  18. 对操作进行全局排序,确保所有线程都能看到相同的操作顺序。
  19. 适用于需要强一致性保证的情况,但可能在性能上有一定开销。

2.6 示例

#include <atomic>
 
template<typename T>
struct node
{
    T data;
    node* next;
    node(const T& data) : data(data), next(nullptr) {}
};
 
template<typename T>
class stack
{
    std::atomic<node<T>*> head;
public:
    void push(const T& data)
    {
        node<T>* new_node = new node<T>(data);
 
        // 将 head 的当前值放到 new_node->next 中
        new_node->next = head.load(std::memory_order_relaxed);
 
        // 现在令 new_node 为新的 head ,但如果 head 不再是
        // 存储于 new_node->next 的值(某些其他线程必须在刚才插入结点)
        // 那么将新的 head 放到 new_node->next 中并再尝试
        while(!head.compare_exchange_weak(new_node->next, new_node,
                                          std::memory_order_release,
                                          std::memory_order_relaxed))
            ; // 循环体为空
 
// 注意:上述使用至少在这些版本不是线程安全的
// 先于 4.8.3 的 GCC(漏洞 60272),先于 2014-05-05 的 clang(漏洞 18899)
// 先于 2014-03-17 的 MSVC(漏洞 819819)。下面是变通方法:
//      node<T>* old_head = head.load(std::memory_order_relaxed);
//      do
//      {
//          new_node->next = old_head;
//      }
//      while (!head.compare_exchange_weak(old_head, new_node,
//                                         std::memory_order_release,
//                                         std::memory_order_relaxed));
    }
};
 
int main()
{
    stack<int> s;
    s.push(1);
    s.push(2);
    s.push(3);
}
#include <atomic>
#include <iostream>
 
std::atomic<int> ai;
 
int tst_val = 4;
int new_val = 5;
bool exchanged= false;
 
void valsout()
{
    std::cout << "ai = " << ai
          << "  tst_val = " << tst_val
          << "  new_val = " << new_val
          << "  exchanged = " << std::boolalpha << exchanged
          << "\n";
}
 
int main()
{
    ai= 3;
    valsout();
 
    // tst_val != ai   ==>  tst_val 被修改
    exchanged= ai.compare_exchange_strong(tst_val, new_val);
    valsout();
 
    // tst_val == ai   ==>  ai 被修改
    exchanged= ai.compare_exchange_strong(tst_val, new_val);
    valsout();
}

3 线程锁

3.1 mutex 和 lock

  • mutex:互斥量可以直接锁住,但通常会通过 lock_guard 来锁
  • recursive_mutex: 同 mutex,一般使用循环中,如一个函数调用另一个函数,这 2 个函数都持有相同的 metux,这是 recursive_mutex 就能避免死循环问题
  • lock_guard:持有 mutex,锁住并自动释放锁
  • unique_lock: 同 lock_guard,但多了些其它控制函数,比如可以主动调用 unique_lock::unlock() 解锁
#include<mutex>
#include<iostream>
using namespace std;
int add(int x, int y)
{
    cout << "x + y = " << x + y << endl;
    return x + y;
}

int main()
{
    //metux
    {
        mutex mt;
        {
            mt.lock();
            add(2, 3);
            mt.unlock();
        }
        {
            lock_guard<mutex> lg(mt);
            add(2, 3);
        }
        {
            unique_lock<mutex> ulg(mt);
            add(2, 3);
            ulg.unlock();
        }
    }

}

3.2 condition_variable

condition_variable 条件变量,顾名思义就是用来当达到某一条件时才去执行某一操作。 - condition_variableunique_lock 一同使用 - condition_variable 有几个重要函数 - condition_variable::notify_one():唤醒一个等待线程 - condition_variable::notify_notify_all:唤醒所有等待线程 - condition_variable::wait(unique_lock,bool);:等待条件满足 - 示例如下

#include<future>
#include<thread>
#include<condition_variable>
#include<mutex>
#include<iostream>
using namespace std;

int main()
{
    //condition_variable
    {
        mutex mt;
        bool b = false;
        condition_variable cv;
        future<void> fv = async([&cv,&mt,&b] {
            unique_lock<mutex> ug(mt);
            cv.wait(ug, [&b] {              
                return b;
            });
            cout << "hello world!" << endl; 
        });
        thread t([&cv, &mt, &b] {
            unique_lock<mutex> ug(mt);
            cv.notify_one();
            b = true;
            cout << "notify" << endl;
        });     
        fv.get();
        t.join();
    }
}

输出结果

notify
hello world!

4 线程池

  • 实现代码
#include<functional>
#include<type_traits>
#include<memory>
#include<mutex>
#include<future>
#include<atomic>
#include<condition_variable>
#include<thread>
#include<queue>
#include<vector>
class ThreadPool
{
private:
    using task = std::function<void()>;
private:
    int m_tNum;//线程池大小
    int m_tunusedNum;//可用线程数
    std::queue<task> m_tasks;//任务队列
    std::vector<std::thread> m_threads;//线程容器
    std::mutex m_m;
    std::condition_variable m_cv;
    std::atomic<bool> m_stop{ false };
public:
    ThreadPool(const unsigned int& sz);
    ~ThreadPool();
    template<typename F, typename ...Args>
    auto addTask(F&& f, Args&& ...args)->std::future<typename std::result_of_t<F(Args...)>>;
private:
    void createThreadPool(const unsigned int& sz);
};
ThreadPool::ThreadPool(const unsigned int& sz) :m_tNum(sz)
{
    //创建线程
    createThreadPool(sz);
}
ThreadPool::~ThreadPool()
{
    m_stop.store(true);
    m_cv.notify_all();
    for (auto& t : m_threads)
    {
        if (t.joinable())
            t.join();
    }

}
void ThreadPool::createThreadPool(const unsigned int& sz)
{
    for (size_t i = 0; i < sz; ++i)
    {
        m_threads.emplace_back([this]() {
            while (true)
            {
                task t;
                {
                    std::unique_lock<std::mutex> lg(m_m);
                    //激活条件:1.线程池不再使用。2.任务队列不为空。
                    m_cv.wait(lg, [this]()->bool {
                        return m_stop.load() || !m_tasks.empty();
                        });
                    //线程池不再使用,终结此线程
                    if (m_stop.load())
                        return;
                    //从任务队列取出任务,并执行
                    t = m_tasks.front();
                    m_tasks.pop();
                }
                //执行任务
                t();
            }
            });
    }
}

template<typename F, typename ...Args>
auto ThreadPool::addTask(F&& f, Args&& ...args)->std::future<typename std::result_of_t<F(Args...)>>
{
    using RET_TYPE = typename std::result_of_t<F(Args...)>;
    //创建智能指针保存 packaged_task类型对象,packaged_task持有ET_TYPE()函数
    auto sh_task = std::make_shared<std::packaged_task<RET_TYPE()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
    //将新任务放入任务队列中
    {
        std::unique_lock<std::mutex> lg(m_m);
        //将任务放入void()函数中,再放入任务队列中
        m_tasks.emplace([sh_task] {
            (*sh_task)();
            });
    }
    //唤醒一个线程
    m_cv.notify_one();
    return sh_task->get_future();
}
  • 测试代码
#include<iostream>
using namespace std;
int main() {
    ThreadPool pool(5);
    vector<future<int>> res;
    for (size_t i = 0; i < 5; i++)
    {
        res.emplace_back(pool.addTask([](int j) {
            printf("函数%d,线程ID %d\n",j,this_thread::get_id());
            return j*j;
            },i));
    }
    for (auto & elm:res)
    {
        cout << elm.get() << endl;
    }   
}

5 “惊群“现象

当多个线程等待一个资源时,如果使用的是 notify_all 方式,所有阻塞的线程都要被唤醒,但同时只能一个线程获取资源。这种现象会导致 cpu 资源无效浪费。

6 一些锁的封装

6.1 嵌套锁

  • 解释:在同一个线程中可以多次调用锁而不会陷入死锁。

6.1.1 实现原理

额外增加 2 个变量,分别记录当前加锁的 线程号加锁次数加锁时 判断是否是已加锁线程,如果是则为嵌套加锁,更新加锁次数即可;如果不是,则调用系统加锁接口(如 pthread_mutex_lock),同时更新加锁次数为 1。解锁时,加锁次数减 1,同时判断加锁次数是否等于 0,如果等于 0 可以调用系统接口(如 pthread_mutex_unlock)释放锁,同时线程号重置为 0;如果大于 0,不用解锁。

6.1.2 源码

#include <pthread.h>
// 全局锁(抽象类),一般用于锁定系统级资源,使用场景,比如:初始化
class LockBase {
public:
    LockBase(){};
    virtual ~LockBase() = 0;
    virtual int Lock(){};
    virtual int TimedLock(int time_sec){};
    virtual int UnLock(){};
};

// 全局嵌套锁,可以在调用链上多次加锁而不会导致死锁
class NestLock : public LockBase {
private:
    pthread_t tid_;
    int lock_time_;
    pthread_mutex_t lock_;

public:
    NestLock();
    ~NestLock();
    virtual int Lock();
    virtual int TimedLock(int time_sec);
    virtual int UnLock();
};


LockBase::~LockBase() {}

NestLock::NestLock() {
    tid_ = 0;
    lock_time_ = 0;
    pthread_mutex_init(&lock_, NULL);
}
NestLock::~NestLock() { pthread_mutex_destroy(&lock_); }
int NestLock::Lock() {
    if (pthread_self() != tid_) {
        pthread_mutex_lock(&lock_);
        tid_ = pthread_self();
        lock_time_ = 1;
    } else {
        // 线程解锁后tid_重新设置成0了,不会出现下面的情况,如果线程号有可能是0,下面的逻辑是必须的
        // if (lock_time_ == 0){
        //     pthread_mutex_lock(&lock_);
        // }
        ++lock_time_;
    }
    return 0;
}
int NestLock::TimedLock(int time_sec) {
    if (pthread_self() != tid_) {
        struct timespec time_out;
        clock_gettime(CLOCK_REALTIME, &time_out);
        time_out.tv_sec += time_sec;
        pthread_mutex_timedlock(&lock_, &time_out);
        tid_ = pthread_self();
        lock_time_ = 1;
    } else {
        // 线程解锁后tid_重新设置成0了,不会出现下面的情况,如果线程号有可能是0,下面的逻辑是必须的
        // if (lock_time_ == 0){
        //     pthread_mutex_lock(&lock_);
        // }
        ++lock_time_;
    }
    return 0;
}
int NestLock::UnLock() {
    --lock_time_;
    if (lock_time_ == 0) {
        tid_ = 0;
        pthread_mutex_unlock(&lock_);
    }
}

6.2 顺序锁

  • 解释:按照调用 lock 时的顺序获取锁。

6.2.1 实现原理

增加额外 2 个变量next_seq_num_tail_seq_num_next_seq_num_ 一个用来记录下一个 能抢占锁的序号tail_seq_num_ 用来给加锁线程 分配序号加锁时,按照调用加锁接口的顺序给线程分配一个序号, 此时 tail_seq_num_+1,当线程抢占到锁后,通过条件判断线程持有的序号是否等于 next_seq_num_,如果相等,那么加锁成功;否则加锁失败,继续等待被唤醒。解锁时,将 next_seq_num_+1,解锁,然后唤醒所有阻塞线程。

6.2.2 源码

//filename: SequenceLock.hpp
#include <atomic>
#include <chrono>
#include <condition_variable>
//#include <iostream>
#include <mutex>
//#include <thread>

class LockBase
{
public:
    LockBase(){};
    virtual ~LockBase() = 0;
    virtual int Lock(){};
    virtual int TimedLock(int time_sec){};
    virtual int UnLock(){};
};
LockBase::~LockBase(){};
class SequenceLock : public LockBase
{
public:
    int next_seq_num_;  // 下一个能抢到锁的序列号
    std::atomic_int tail_seq_num_;
    std::mutex mtx_;
    std::condition_variable cv_;

public:
    SequenceLock();
    ~SequenceLock();
    virtual int Lock();
    virtual int TimedLock(int time_sec);
    virtual int UnLock();
};
SequenceLock::SequenceLock()
{
    next_seq_num_ = 1;
    tail_seq_num_ = 0;
}
SequenceLock::~SequenceLock() {}
int SequenceLock::Lock()
{
    int cur_seq = ++tail_seq_num_;
    std::unique_lock<std::mutex> lg(mtx_);
    cv_.wait(lg, [cur_seq, this]() { return cur_seq == this->next_seq_num_; });
    //std::cout << "locked cur_seq: " << cur_seq << std::endl;
    lg.release();
    return 0;
}
int SequenceLock::TimedLock(int time_sec)
{
    int cur_seq = ++tail_seq_num_;
    std::unique_lock<std::mutex> lg(mtx_);
    if (cv_.wait_for(lg, std::chrono::seconds(time_sec),
                     [cur_seq, this]()
                     { return cur_seq == this->next_seq_num_; }))
    {
        //std::cout << "locked cur_seq: " << cur_seq << std::endl;
        lg.release();
        return 0;
    }
    else
    {
        return -1;
    }
}
int SequenceLock::UnLock()
{
    ++next_seq_num_;
    mtx_.unlock();
    cv_.notify_all();
    return 0;
}
  • 测试代码
#include "SequenceLock.hpp"
void fun(SequenceLock* sl)
{
    for (size_t i = 0; i < 50; i++)
    {
        sl->Lock();
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
        sl->UnLock();
    }
}
int main()
{
    std::cout << "start" << std::endl;
    SequenceLock sl;
    std::thread t1(fun, &sl);
    std::thread t2(fun, &sl);
    std::thread t3(fun, &sl);
    std::thread t4(fun, &sl);
    t1.join();
    t3.join();
    t2.join();
    t4.join();
    std::cout << "end" << std::endl;
    return 1;
}
6.2.2.1 v1.1 增加 trylock

增加一个 b_try_lock_ 变量记录当前 mutex 是否被 try_lock 持有。为什么增加这个变量,是因为 trylock 时不增加序号,unlock 时也就不需要给 next_seq_num_+1。

// filename: sequencelock.cc
// compiler: g++ -g --std=c++11 -pthread sequencelock.cc -o sequencelock
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>

class LockBase
{
public:
    LockBase(){};
    virtual ~LockBase() = 0;
    virtual int Lock(){};
    virtual int TryLock(){};
    virtual int TimedLock(int time_sec){};
    virtual int UnLock(){};
};
LockBase::~LockBase(){};
class SequenceLock : public LockBase
{
private:
    volatile bool b_try_lock_;
    int next_seq_num_;  // 下一个能抢到锁的序列号
    std::atomic_int tail_seq_num_;
    std::mutex mtx_;
    std::condition_variable cv_;

public:
    SequenceLock();
    ~SequenceLock();
    virtual int Lock();
    virtual int TryLock();
    virtual int TimedLock(int time_sec);
    virtual int UnLock();
};
SequenceLock::SequenceLock()
{
    b_try_lock_ = false;
    next_seq_num_ = 1;
    tail_seq_num_ = 0;
}
SequenceLock::~SequenceLock() {}
int SequenceLock::Lock()
{
    int cur_seq = ++tail_seq_num_;
    std::unique_lock<std::mutex> lg(mtx_);
    cv_.wait(lg, [cur_seq, this]() { return cur_seq == this->next_seq_num_; });
    //std::cout << "locked cur_seq: " << cur_seq << std::endl;
    lg.release();
    return 0;
}
int SequenceLock::TryLock()
{
    std::unique_lock<std::mutex> lg(mtx_, std::defer_lock);
    if (lg.try_lock())
    {
        b_try_lock_ = true;
        //std::cout << "TryLock next_seq_num_: " << next_seq_num_ << std::endl;
        lg.release();
        return 0;
    }
    else
    {
        return -1;
    }
}
int SequenceLock::TimedLock(int time_sec)
{
    int cur_seq = ++tail_seq_num_;
    std::unique_lock<std::mutex> lg(mtx_);
    if (cv_.wait_for(lg, std::chrono::seconds(time_sec),
                     [cur_seq, this]()
                     { return cur_seq == this->next_seq_num_; }))
    {
        // std::cout << "locked cur_seq: " << cur_seq << std::endl;
        lg.release();
        return 0;
    }
    else
    {
        return -1;
    }
}
int SequenceLock::UnLock()
{
    if (!b_try_lock_)
    {
        ++next_seq_num_;
    }
    else
    {
        b_try_lock_ = false;
    }
    mtx_.unlock();
    cv_.notify_all();
    return 0;
}

void fun(SequenceLock* sl)
{
    for (size_t i = 0; i < 10; i++)
    {
        sl->Lock();
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
        sl->UnLock();
    }
}
void tryfun(SequenceLock* sl)
{
    for (size_t i = 0; i < 50; i++)
    {
        if (sl->TryLock() == 0)
        {
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
            std::cout << "TryLock SUCCESS " << std::endl;
            sl->UnLock();
        }
        else
        {
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
            std::cout << "TryLock failed " << std::endl;
        }
    }
}
int main()
{
    std::cout << "start" << std::endl;
    SequenceLock sl;
    std::thread t1(fun, &sl);
    // std::thread t2(fun, &sl);
    // std::thread t3(fun, &sl);
    // std::thread t4(fun, &sl);
    std::thread t5(tryfun, &sl);
    t5.join();
    t1.join();
    // t3.join();
    // t2.join();
    // t4.join();

    std::cout << "end" << std::endl;
    return 1;
}

6.2.3 疑点分析

为什么不用 notify_one()? 使用 notify_one 可能会导致线程一直阻塞在 wait 上。 为什么调用 lg.release()? 因为 unique_lock 会自动释放锁,但我们不需要释放锁,这时使用 lg.release() 就将 unique_lock 和 mutex 分开了,就不会自动释放锁了。