本系列笔记参考《C++ Concurrency in Action, 2nd Edition》 及其中文翻译 。
使用互斥量 不推荐直接去调用 std::mutex 的成员函数,调用其成员函数就意味着必须在每个函数出口都要去调用 unlock()(包括异常的情况)。C++ 标准库为互斥量提供了 RAII 模板类 std::lock_guard,在构造时就能提供已锁的互斥量,并在析构时进行解锁,从而保证了互斥量能被正确解锁。某些情况下使用全局变量没问题,但大多数情况下,互斥量通常会与需要保护的数据放在同一类中,而不是定义成全局变量。
当然,也不是总能那么理想:当其中一个成员函数返回的是保护数据的指针或引用时,也会破坏数据。具有访问能力的指针或引用可以访问(并可能修改)保护数据,而不会被互斥锁限制。这就需要对接口谨慎设计,要确保互斥量能锁住数据访问,并且不留后门。
接口间的条件竞争 互斥量可以保证多个线程依次访问同一份数据,但是即便如此,仍然会存在竞争问题。以 std::stack 的 top() 和 pop() 接口为例,当一个线程依次调用这两个接口的间隙,另一个线程调用了 pop(),这样前一个线程 pop 出的数据就不是它之前通过 top 获得的,出现了错误。
先说一下为什么标准库的 stack 为什么会把这两个接口分开而不是做在一起。如果这两个功能做在一起,流程就是先移除栈顶元素,再 return 返回该元素。return 的时候会发生元素的拷贝,如果此时拷贝构造函数抛出了异常,那么该栈顶元素就发生了丢失。
为了保证安全拷贝数据,需要将此接口分割。不幸的是,这样的分割却制造了本想避免的条件竞争。幸运的是,我们还有的别的选项,但使用每个选项都有相应的代价。
选项 1:传入一个引用 1 2 std::vector<int > result; some_stack.pop (result);
这种方法的缺点也比较明显:
需要构造出一个栈中类型的实例,用于接收目标值。对于一些类型,这样做是不现实的,因为临时构造一个实例,从时间和资源的角度上来看都不划算;对于另外一些类型,这样也不总行得通,因为构造函数需要的参数,在这个阶段不一定可用。
需要可赋值的存储类型,这是一个重大限制。即使支持移动构造,甚至是拷贝构造(从而允许返回一个值),很多用户自定义类型可能都不支持赋值操作。
选项 2:无异常抛出的拷贝构造函数或移动构造函数 尽管能在编译时可使用 std::is_nothrow_copy_constructible
和 std::is_nothrow_move_constructible
,让拷贝或移动构造函数不抛出异常,但是这种方式的局限性太强。用户自定义的类型中,会有不抛出异常的拷贝构造函数或移动构造函数的类型,但是那些有抛出异常的拷贝构造函数的类型往往更多。
选项 3:返回指向弹出值的指针 指针的优势是自由拷贝,并且不会产生异常,这样就能避免前面提到的异常问题了。缺点就是返回指针需要对对象的内存分配进行管理,对于简单数据类型,内存管理的开销要远大于直接返回值。对于这个方案,使用智能指针是个不错的选择,不仅能避免内存泄露,而且标准库能够完全控制内存分配方案,就不需要 new 和 delete 操作。
线程安全的堆栈 下面是一个接口没有竞争的堆栈,它实现了选项 1 和选项 3:重载了 pop(),使用局部引用去存储弹出值,并返回 std::shared_ptr 对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 #include <exception> #include <memory> #include <mutex> #include <stack> struct empty_stack : std::exception { const char * what () const throw () { return "empty stack!" ; }; }; template <typename T>class threadsafe_stack {private : std::stack<T> data; mutable std::mutex m; public : threadsafe_stack () : data (std::stack <T>()) {} threadsafe_stack (const threadsafe_stack& other) { std::lock_guard<std::mutex> lock (other.m) ; data = other.data; } threadsafe_stack& operator =(const threadsafe_stack&) = delete ; void push (T new_value) { std::lock_guard<std::mutex> lock (m) ; data.push (new_value); } std::shared_ptr<T> pop () { std::lock_guard<std::mutex> lock (m) ; if (data.empty ()) throw empty_stack (); std::shared_ptr<T> const res (std::make_shared<T>(data.top())) ; data.pop (); return res; } void pop (T& value) { std::lock_guard<std::mutex> lock (m) ; if (data.empty ()) { throw empty_stack (); } value=data.top (); data.pop (); } bool empty () const { std::lock_guard<std::mutex> lock (m) ; return data.empty (); } };
死锁 两个线程试图通过不同的顺序获取多个相同的锁,如果请求的顺序不相同,那么会出现循环的锁依赖现象,产生死锁。这是最常见的死锁情况,如下:
1 2 3 4 5 6 thread A thread B lock (mutex1); lock (mutex2); lock (mutex2); lock (mutex1);
避免死锁的一般建议,就是让这两个线程以相同的顺序获取这些锁。但是即便如此,也可能会有问题。例如 Swap(a, b) 需要访问需要同时访问 a、b 的内部数据,理所当然对 a、b 都加锁再访问,按常规思路首先对第一个参数加锁,然后对第二个参数加锁。当两个线程同时分别调用 Swap(a, b) 和 Swap(b, a) 时,就又出现了上述的“两个线程试图通过不同的顺序获取多个相同的锁”的情况,就有可能发生死锁。
C++ 标准库解决这个问题的方法是通过 std::lock 一次性锁住多个(两个以上)的互斥量。下面的程序代码演示了在一个简单的交换操作用使用 std::lock。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 #inlcude <mutex> class some_big_object ;void Swap (some_big_object& lhs,some_big_object& rhs) ;class X {private : some_big_object some_detail; std::mutex m; public : X (some_big_object const & sd):some_detail (sd) {} friend void swap (X& lhs, X& rhs) { if (&lhs==&rhs) { return ; } std::lock (lhs.m, rhs.m); std::lock_guard<std::mutex> lock_a (lhs.m, std::adopt_lock) ; std::lock_guard<std::mutex> lock_b (rhs.m, std::adopt_lock) ; swap (lhs.some_detail, rhs.some_detail); } };
C++17 对这种情况提供了支持,std::scoped_lock 是一种新的 RAII 模板类型,与 std::lock_guard 的功能相同,这个新类型能接受不定数量的互斥量类型作为模板参数,以及相应的互斥量(数量和类型)作为构造参数。Swap 操作可以重写如下:
1 2 3 4 5 6 7 8 9 10 void Swap (X& lhs, X& rhs) { if (&lhs == &rhs) { return ; } std::scoped_lock guard (lhs.m, rhs.m) ; swap (lhs.some_detail, rhs.some_detail); }
避免死锁的进阶指导 避免嵌套锁
线程获得一个锁时,就别再去获取第二个,因为每个线程只持有一个锁时不会产生死锁。
当需要获取多个锁,使用 std::lock 来做这件事(对获取锁的操作上锁),避免产生死锁。
避免在持有锁时调用外部代码 因为代码是外部提供的,所以没有办法确定外部要做什么。外部程序可能做任何事情,包括获取锁。在持有锁的情况下,如果用外部代码要获取一个锁,就会违反上一条指导意见,并造成死锁。当写通用代码时(例如前面提到的栈),每一个操作的参数类型,都是外部提供的定义,这就需要其他指导意见来帮助你了。
使用固定顺序获取锁 以链表为例,链表的每个节点都有一个互斥量保护。为了访问链表,线程必须获取感兴趣节点上的互斥锁。当一个线程删除一个节点,就必须获取三个节点上的互斥锁:将要删除的节点,两个邻接节点。为了遍历链表,线程必须保证在获取当前节点的互斥锁前提下,获得下一个节点的锁,要保证指向下一个节点的指针不会同时被修改。当下一个节点上的锁被获取,第一个节点的锁就可以释放了。
为了避免死锁,节点必须以固定的顺序上锁,如果两个线程试图用互为反向的顺序,执行到链表中间部分时会发生死锁。所以对于链表,避免出现死锁的方式就是不允许反向遍历链表。类似的约定常用于建立其他的数据结构。
使用层次锁 每一个互斥量都拥有一个层级号码,并且严格按照下面两个规则:
当某个线程已经持有层级为 N 的互斥量的时候,只能去获取层次小于 N 的互斥量
当试图同时占有多个同层级的互斥量的时候,这些互斥量必须一次性获取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 class hierarchical_mutex { std::mutex internal_mutex; unsigned long const hierarchy_value; unsigned long previous_hierarchy_value; static thread_local unsigned long this_thread_hierarchy_value; void check_for_hierarchy_violation () { if (this_thread_hierarchy_value <= hierarchy_value) { throw std::logic_error (”mutex hierarchy violated”); } } void update_hierarchy_value () { previous_hierarchy_value = this_thread_hierarchy_value; this_thread_hierarchy_value = hierarchy_value; } public : explicit hierarchical_mutex (unsigned long value) : hierarchy_value(value), previous_hierarchy_value(0 ) { } void lock () { check_for_hierarchy_violation (); internal_mutex.lock (); update_hierarchy_value (); } void unlock () { if (this_thread_hierarchy_value != hierarchy_value) { throw std::logic_error (“mutex hierarchy violated”); } this_thread_hierarchy_value = previous_hierarchy_value; internal_mutex.unlock (); } bool try_lock () { check_for_hierarchy_violation (); if (!internal_mutex.try_lock ()) { return false ; } update_hierarchy_value (); return true ; } }; thread_local unsigned long hierarchical_mutex::this_thread_hierarchy_value (ULONG_MAX) ;
std::unique_lock——灵活的锁 std::unique_lock 拥有 lock、unlock 接口,比 std::lock_guard 更为灵活,可以随时上锁解锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 class LogFile {public : LogFile () { f.open ("log.txt" ); } ~LogFile () { f.close (); } void shared_print (string msg, int id) { std::unique_lock<std::mutex> guard (_mu) ; guard.unlock (); guard.lock (); guard.ulock (); } private : std::mutex mutex_; std::ofstream f; };
还可以使用 std::defer_lock 设置初始化的时候不进行默认的上锁操作:
1 2 3 4 5 6 7 8 9 10 void shared_print (string msg, int id) { std::unique_lock<std::mutex> guard (_mu, std::defer_lock) ; guard.lock (); guard.unlock (); guard.lock (); }
保护共享数据的初始化过程 下面的代码是一种比较常见的单线程初始化方式——lazy initialization:
1 2 3 4 5 6 7 8 std::shared_ptr<some_resource> resource_ptr; void foo () { if (!resource_ptr) { resource_ptr.reset (new some_resource); } resource_ptr->do_something (); }
转为多线程代码时,只有初始化资源处需要保护,这样共享数据对于并发访问就是安全的。但是下面的实现会使得线程资源产生不必要的序列化,为了确定数据源已经初始化,每个线程必须等待互斥量。
1 2 3 4 5 6 7 8 9 10 11 12 13 std::shared_ptr<some_resource> resource_ptr; std::mutex resource_mutex; void foo () { std::unique_lock<std::mutex> lk (resource_mutex) ; if (!resource_ptr) { resource_ptr.reset (new some_resource); } lk.unlock (); resource_ptr->do_something (); }
使用 double check 的方式可以避免没必要的串行化:
1 2 3 4 5 6 7 8 9 void undefined_behaviour_with_double_checked_locking () { if (!resource_ptr) { std::lock_guard<std::mutex> lk (resource_mutex) ; if (!resource_ptr) { resource_ptr.reset (new some_resource); } } resource_ptr->do_something (); }
但是 double check 存在潜在的条件竞争:未被锁保护的读取操作 1 没有与其他线程里被锁保护的写入操作 2 进行同步,因此就会产生条件竞争,这个条件竞争不仅覆盖指针本身,还会影响到其指向的对象。
C++ 标准库提供了 std::once_flag 和 std::call_once 来处理这种情况。比起锁住互斥量并显式的检查指针,每个线程只需要使用 std::call_once 就可以。
1 2 3 4 5 6 7 8 9 10 11 std::shared_ptr<some_resource> resource_ptr; std::once_flag resource_flag; void init_resource () { resource_ptr.reset (new some_resource); } void foo () { std::call_once (resource_flag,init_resource); resource_ptr->do_something (); }
还有一种初始化过程中潜存着条件竞争:其中一个局部变量为 static 类型,这种变量的在声明后就已经完成初始化。对于多线程调用的函数,这就意味着这里有条件竞争——抢着去定义这个变量。在 C++11 没有出来的时候,只能靠插入两个 memory barrier(内存屏障)来解决这个错误,但是 C++11 引进了 memory model,提供了 atomic 实现内存的同步访问,即不同线程总是获取对象修改前或修改后的值,无法在对象修改期间获得该对象。换句话说,C++11 规定了 local static 在多线程条件下的初始化行为,要求编译器保证了内部静态变量的线程安全性。
保护不常更新的数据结构 读写锁把对共享资源的访问者划分成读者和写者,读者只对共享资源进行读访问,写者则需要对共享资源进行写操作。C++17 开始,标准库提供了shared_mutex 类(在这之前,可以使用 boost::shared_mutex 类或系统相关 api)。shared_mutex 通常用于多个读线程能同时访问同一资源而不导致数据竞争,但只有一个写线程能访问的情形。
下面的代码清单展示了一个简单的 DNS 缓存,使用 std::map 持有缓存数据,使用 std::shared_mutex 进行保护。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 #include <map> #include <string> #include <mutex> #include <shared_mutex> class dns_entry ;class dns_cache {public : dns_entry find_entry (std::string const & domain) const { std::shared_lock<std::shared_mutex> lk (entry_mutex) ; std::map<std::string,dns_entry>::const_iterator const it = entries.find (domain); return (it == entries.end ()) ? dns_entry () : it->second; } void update_or_add_entry (std::string const & domain, dns_entry const & dns_details) { std::lock_guard<std::shared_mutex> lk (entry_mutex) ; entries[domain] = dns_details; } private : std::map<std::string, dns_entry> entries; mutable std::shared_mutex entry_mutex; };
嵌套锁 线程对已经获取的 std::mutex 再次上锁是错误的,尝试这样做会导致未定义行为。因此,C++ 标准库提供了 std::recursive_mutex 类。除了可以在同一线程的单个实例上多次上锁,其他功能与 std::mutex 相同。其他线程对互斥量上锁前,当前线程必须释放拥有的所有锁,所以如果你调用 lock() 三次,也必须调用 unlock() 三次。正确使用 std::lock_guardstd::recursive_mutex 和 std::unique_lockstd::recursive_mutex 可以帮你处理这些问题。
使用嵌套锁时,要对代码设计进行改动。嵌套锁一般用在可并发访问的类上,所以使用互斥量保护其成员数据。每个公共成员函数都会对互斥量上锁,然后完成对应的操作后再解锁互斥量。不过,有时成员函数会调用另一个成员函数,这种情况下,第二个成员函数也会试图锁住互斥量,这就会导致未定义行为的发生。“变通的”解决方案会将互斥量转为嵌套锁,第二个成员函数就能成功的进行上锁,并且函数能继续执行。