03 共享数据 | 《C++ Concurrency In Action》笔记

本系列笔记参考《C++ Concurrency in Action, 2nd Edition》及其中文翻译

使用互斥量

不推荐直接去调用 std::mutex 的成员函数,调用其成员函数就意味着必须在每个函数出口都要去调用 unlock()(包括异常的情况)。C++ 标准库为互斥量提供了 RAII 模板类 std::lock_guard,在构造时就能提供已锁的互斥量,并在析构时进行解锁,从而保证了互斥量能被正确解锁。某些情况下使用全局变量没问题,但大多数情况下,互斥量通常会与需要保护的数据放在同一类中,而不是定义成全局变量。

当然,也不是总能那么理想:当其中一个成员函数返回的是保护数据的指针或引用时,也会破坏数据。具有访问能力的指针或引用可以访问(并可能修改)保护数据,而不会被互斥锁限制。这就需要对接口谨慎设计,要确保互斥量能锁住数据访问,并且不留后门。

接口间的条件竞争

互斥量可以保证多个线程依次访问同一份数据,但是即便如此,仍然会存在竞争问题。以 std::stack 的 top() 和 pop() 接口为例,当一个线程依次调用这两个接口的间隙,另一个线程调用了 pop(),这样前一个线程 pop 出的数据就不是它之前通过 top 获得的,出现了错误。

先说一下为什么标准库的 stack 为什么会把这两个接口分开而不是做在一起。如果这两个功能做在一起,流程就是先移除栈顶元素,再 return 返回该元素。return 的时候会发生元素的拷贝,如果此时拷贝构造函数抛出了异常,那么该栈顶元素就发生了丢失。

为了保证安全拷贝数据,需要将此接口分割。不幸的是,这样的分割却制造了本想避免的条件竞争。幸运的是,我们还有的别的选项,但使用每个选项都有相应的代价。

选项 1:传入一个引用

1
2
std::vector<int> result;
some_stack.pop(result);

这种方法的缺点也比较明显:

  • 需要构造出一个栈中类型的实例,用于接收目标值。对于一些类型,这样做是不现实的,因为临时构造一个实例,从时间和资源的角度上来看都不划算;对于另外一些类型,这样也不总行得通,因为构造函数需要的参数,在这个阶段不一定可用。
  • 需要可赋值的存储类型,这是一个重大限制。即使支持移动构造,甚至是拷贝构造(从而允许返回一个值),很多用户自定义类型可能都不支持赋值操作。

选项 2:无异常抛出的拷贝构造函数或移动构造函数

尽管能在编译时可使用 std::is_nothrow_copy_constructiblestd::is_nothrow_move_constructible,让拷贝或移动构造函数不抛出异常,但是这种方式的局限性太强。用户自定义的类型中,会有不抛出异常的拷贝构造函数或移动构造函数的类型,但是那些有抛出异常的拷贝构造函数的类型往往更多。

选项 3:返回指向弹出值的指针

指针的优势是自由拷贝,并且不会产生异常,这样就能避免前面提到的异常问题了。缺点就是返回指针需要对对象的内存分配进行管理,对于简单数据类型,内存管理的开销要远大于直接返回值。对于这个方案,使用智能指针是个不错的选择,不仅能避免内存泄露,而且标准库能够完全控制内存分配方案,就不需要 new 和 delete 操作。

线程安全的堆栈

下面是一个接口没有竞争的堆栈,它实现了选项 1 和选项 3:重载了 pop(),使用局部引用去存储弹出值,并返回 std::shared_ptr 对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#include <exception>
#include <memory>
#include <mutex>
#include <stack>

struct empty_stack: std::exception {
const char* what() const throw() {
return "empty stack!";
};
};

template<typename T>
class threadsafe_stack {
private:
std::stack<T> data;
mutable std::mutex m;

public:
threadsafe_stack() : data(std::stack<T>()) {}

threadsafe_stack(const threadsafe_stack& other) {
std::lock_guard<std::mutex> lock(other.m);
// 在构造函数体中的执行拷贝
data = other.data;
}

threadsafe_stack& operator=(const threadsafe_stack&) = delete;

void push(T new_value) {
std::lock_guard<std::mutex> lock(m);
data.push(new_value);
}

std::shared_ptr<T> pop() {
std::lock_guard<std::mutex> lock(m);
// 在调用pop前,检查栈是否为空
if(data.empty()) throw empty_stack();

// 在修改堆栈前,分配出返回值
std::shared_ptr<T> const res(std::make_shared<T>(data.top()));
data.pop();
return res;
}

void pop(T& value) {
std::lock_guard<std::mutex> lock(m);
if(data.empty()) {
throw empty_stack();
}
value=data.top();
data.pop();
}

bool empty() const {
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};

死锁

两个线程试图通过不同的顺序获取多个相同的锁,如果请求的顺序不相同,那么会出现循环的锁依赖现象,产生死锁。这是最常见的死锁情况,如下:

1
2
3
4
5
6
thread A            thread B

lock (mutex1);
lock (mutex2);
lock (mutex2);
lock (mutex1);

避免死锁的一般建议,就是让这两个线程以相同的顺序获取这些锁。但是即便如此,也可能会有问题。例如 Swap(a, b) 需要访问需要同时访问 a、b 的内部数据,理所当然对 a、b 都加锁再访问,按常规思路首先对第一个参数加锁,然后对第二个参数加锁。当两个线程同时分别调用 Swap(a, b) 和 Swap(b, a) 时,就又出现了上述的“两个线程试图通过不同的顺序获取多个相同的锁”的情况,就有可能发生死锁。

C++ 标准库解决这个问题的方法是通过 std::lock 一次性锁住多个(两个以上)的互斥量。下面的程序代码演示了在一个简单的交换操作用使用 std::lock。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#inlcude <mutex>

class some_big_object;

void Swap(some_big_object& lhs,some_big_object& rhs);

class X {
private:
some_big_object some_detail;
std::mutex m;

public:
X(some_big_object const& sd):some_detail(sd) {}

friend void swap(X& lhs, X& rhs) {
if (&lhs==&rhs) {
return;
}

// 尝试获取两个互斥量,获取某个互斥量失败时抛出异常并将另一个锁释放
// 即要么将两个锁都锁住,要不一个都不锁
std::lock(lhs.m, rhs.m);
// std::adopt_lock 表明当前线程已经获得了该互斥量,此后互斥量的解锁操作交由 lock_guard 对象来管理
std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock);
std::lock_guard<std::mutex> lock_b(rhs.m, std::adopt_lock);
swap(lhs.some_detail, rhs.some_detail);
}
};

C++17 对这种情况提供了支持,std::scoped_lock 是一种新的 RAII 模板类型,与 std::lock_guard 的功能相同,这个新类型能接受不定数量的互斥量类型作为模板参数,以及相应的互斥量(数量和类型)作为构造参数。Swap 操作可以重写如下:

1
2
3
4
5
6
7
8
9
10
void Swap(X& lhs, X& rhs) {
if(&lhs == &rhs) {
return;
}
// 这里使用了C++17的另一个特性:自动推导模板参数
// 下面这行代码相当于
// std::scoped_lock<std::mutex, std::mutex> guard(lhs.m, rhs.m);
std::scoped_lock guard(lhs.m, rhs.m);
swap(lhs.some_detail, rhs.some_detail);
}

避免死锁的进阶指导

避免嵌套锁

  • 线程获得一个锁时,就别再去获取第二个,因为每个线程只持有一个锁时不会产生死锁。
  • 当需要获取多个锁,使用 std::lock 来做这件事(对获取锁的操作上锁),避免产生死锁。

避免在持有锁时调用外部代码

因为代码是外部提供的,所以没有办法确定外部要做什么。外部程序可能做任何事情,包括获取锁。在持有锁的情况下,如果用外部代码要获取一个锁,就会违反上一条指导意见,并造成死锁。当写通用代码时(例如前面提到的栈),每一个操作的参数类型,都是外部提供的定义,这就需要其他指导意见来帮助你了。

使用固定顺序获取锁

以链表为例,链表的每个节点都有一个互斥量保护。为了访问链表,线程必须获取感兴趣节点上的互斥锁。当一个线程删除一个节点,就必须获取三个节点上的互斥锁:将要删除的节点,两个邻接节点。为了遍历链表,线程必须保证在获取当前节点的互斥锁前提下,获得下一个节点的锁,要保证指向下一个节点的指针不会同时被修改。当下一个节点上的锁被获取,第一个节点的锁就可以释放了。

为了避免死锁,节点必须以固定的顺序上锁,如果两个线程试图用互为反向的顺序,执行到链表中间部分时会发生死锁。所以对于链表,避免出现死锁的方式就是不允许反向遍历链表。类似的约定常用于建立其他的数据结构。

使用层次锁

每一个互斥量都拥有一个层级号码,并且严格按照下面两个规则:

  1. 当某个线程已经持有层级为 N 的互斥量的时候,只能去获取层次小于 N 的互斥量
  2. 当试图同时占有多个同层级的互斥量的时候,这些互斥量必须一次性获取
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class hierarchical_mutex {
std::mutex internal_mutex;

// 每个互斥量的层次值
unsigned long const hierarchy_value;
unsigned long previous_hierarchy_value;

// 这个变量标示该互斥量在每个线程正在被哪个层次的锁锁住
static thread_local unsigned long this_thread_hierarchy_value;

// 如果一个线程已经被一个低层次的锁锁住,又想去获取一个高层次锁,则抛出异常
void check_for_hierarchy_violation() {
if(this_thread_hierarchy_value <= hierarchy_value) {
throw std::logic_error(”mutex hierarchy violated”);
}
}

void update_hierarchy_value() {
previous_hierarchy_value = this_thread_hierarchy_value;
this_thread_hierarchy_value = hierarchy_value;
}

public:
explicit hierarchical_mutex(unsigned long value) : hierarchy_value(value), previous_hierarchy_value(0) {}

void lock() {
check_for_hierarchy_violation();
internal_mutex.lock();
update_hierarchy_value();
}

void unlock() {
// 为了避免无序解锁造成层次混乱,不是解锁最近上锁的那个互斥量,就需要抛出异常
if (this_thread_hierarchy_value != hierarchy_value) {
throw std::logic_error(“mutex hierarchy violated”);
}
this_thread_hierarchy_value = previous_hierarchy_value;
internal_mutex.unlock();
}

bool try_lock() {
check_for_hierarchy_violation();
if (!internal_mutex.try_lock()) {
return false;
}
update_hierarchy_value();
return true;
}
};

// 该值被初始化为最大值,这个值会大于其他任何值,所以可以被任何线程上锁
thread_local unsigned long hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX);

std::unique_lock——灵活的锁

std::unique_lock 拥有 lock、unlock 接口,比 std::lock_guard 更为灵活,可以随时上锁解锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class LogFile {
public:
LogFile() {
f.open("log.txt");
}

~LogFile() {
f.close();
}

void shared_print(string msg, int id) {
std::unique_lock<std::mutex> guard(_mu); // 上锁
//do something
guard.unlock(); // 临时解锁
//do something
guard.lock(); // 继续上锁
// do something
// 结束时析构 guard 会解锁,这句话可要可不要,不写析构的时候也会自动执行
guard.ulock();
}

private:
std::mutex mutex_;
std::ofstream f;
};

还可以使用 std::defer_lock 设置初始化的时候不进行默认的上锁操作:

1
2
3
4
5
6
7
8
9
10
void shared_print(string msg, int id) {
std::unique_lock<std::mutex> guard(_mu, std::defer_lock);
//do something
guard.lock();
// do something
guard.unlock();
//do something
guard.lock();
// do something
}

保护共享数据的初始化过程

下面的代码是一种比较常见的单线程初始化方式——lazy initialization:

1
2
3
4
5
6
7
8
std::shared_ptr<some_resource> resource_ptr;

void foo() {
if (!resource_ptr) {
resource_ptr.reset(new some_resource);
}
resource_ptr->do_something();
}

转为多线程代码时,只有初始化资源处需要保护,这样共享数据对于并发访问就是安全的。但是下面的实现会使得线程资源产生不必要的序列化,为了确定数据源已经初始化,每个线程必须等待互斥量。

1
2
3
4
5
6
7
8
9
10
11
12
13
std::shared_ptr<some_resource> resource_ptr;
std::mutex resource_mutex;

void foo() {
// 所有线程在此序列化
std::unique_lock<std::mutex> lk(resource_mutex);
if (!resource_ptr) {
// 只有初始化过程需要保护
resource_ptr.reset(new some_resource);
}
lk.unlock();
resource_ptr->do_something();
}

使用 double check 的方式可以避免没必要的串行化:

1
2
3
4
5
6
7
8
9
void undefined_behaviour_with_double_checked_locking() {
if (!resource_ptr) { // 1
std::lock_guard<std::mutex> lk(resource_mutex);
if(!resource_ptr) {
resource_ptr.reset(new some_resource); // 2
}
}
resource_ptr->do_something();
}

但是 double check 存在潜在的条件竞争:未被锁保护的读取操作 1 没有与其他线程里被锁保护的写入操作 2 进行同步,因此就会产生条件竞争,这个条件竞争不仅覆盖指针本身,还会影响到其指向的对象。

C++ 标准库提供了 std::once_flag 和 std::call_once 来处理这种情况。比起锁住互斥量并显式的检查指针,每个线程只需要使用 std::call_once 就可以。

1
2
3
4
5
6
7
8
9
10
11
std::shared_ptr<some_resource> resource_ptr;
std::once_flag resource_flag;

void init_resource() {
resource_ptr.reset(new some_resource);
}

void foo() {
std::call_once(resource_flag,init_resource);
resource_ptr->do_something();
}

还有一种初始化过程中潜存着条件竞争:其中一个局部变量为 static 类型,这种变量的在声明后就已经完成初始化。对于多线程调用的函数,这就意味着这里有条件竞争——抢着去定义这个变量。在 C++11 没有出来的时候,只能靠插入两个 memory barrier(内存屏障)来解决这个错误,但是 C++11 引进了 memory model,提供了 atomic 实现内存的同步访问,即不同线程总是获取对象修改前或修改后的值,无法在对象修改期间获得该对象。换句话说,C++11 规定了 local static 在多线程条件下的初始化行为,要求编译器保证了内部静态变量的线程安全性。

保护不常更新的数据结构

读写锁把对共享资源的访问者划分成读者和写者,读者只对共享资源进行读访问,写者则需要对共享资源进行写操作。C++17 开始,标准库提供了shared_mutex 类(在这之前,可以使用 boost::shared_mutex 类或系统相关 api)。shared_mutex 通常用于多个读线程能同时访问同一资源而不导致数据竞争,但只有一个写线程能访问的情形。

下面的代码清单展示了一个简单的 DNS 缓存,使用 std::map 持有缓存数据,使用 std::shared_mutex 进行保护。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <map>
#include <string>
#include <mutex>
#include <shared_mutex>

class dns_entry;

class dns_cache {
public:
dns_entry find_entry(std::string const& domain) const {
std::shared_lock<std::shared_mutex> lk(entry_mutex);
std::map<std::string,dns_entry>::const_iterator const it = entries.find(domain);
return (it == entries.end()) ? dns_entry() : it->second;
}

void update_or_add_entry(std::string const& domain, dns_entry const& dns_details) {
std::lock_guard<std::shared_mutex> lk(entry_mutex);
entries[domain] = dns_details;
}

private:
std::map<std::string, dns_entry> entries;
mutable std::shared_mutex entry_mutex;
};

嵌套锁

线程对已经获取的 std::mutex 再次上锁是错误的,尝试这样做会导致未定义行为。因此,C++ 标准库提供了 std::recursive_mutex 类。除了可以在同一线程的单个实例上多次上锁,其他功能与 std::mutex 相同。其他线程对互斥量上锁前,当前线程必须释放拥有的所有锁,所以如果你调用 lock() 三次,也必须调用 unlock() 三次。正确使用 std::lock_guardstd::recursive_mutex 和 std::unique_lockstd::recursive_mutex 可以帮你处理这些问题。

使用嵌套锁时,要对代码设计进行改动。嵌套锁一般用在可并发访问的类上,所以使用互斥量保护其成员数据。每个公共成员函数都会对互斥量上锁,然后完成对应的操作后再解锁互斥量。不过,有时成员函数会调用另一个成员函数,这种情况下,第二个成员函数也会试图锁住互斥量,这就会导致未定义行为的发生。“变通的”解决方案会将互斥量转为嵌套锁,第二个成员函数就能成功的进行上锁,并且函数能继续执行。

03 共享数据 | 《C++ Concurrency In Action》笔记

http://www.zh0ngtian.tech/posts/839c234e.html

作者

zhongtian

发布于

2021-03-14

更新于

2023-12-16

许可协议

评论