04 同步操作 | 《C++ Concurrency In Action》笔记

本系列笔记参考《C++ Concurrency in Action, 2nd Edition》及其中文翻译

等待事件或条件

等待条件达成

C++ 标准库对条件变量有两套实现:std::condition_variable 和 std::condition_variable_any,这两个实现都包含在 condition_variable.h 头文件的声明中。两者都需要与互斥量一起才能工作,前者仅能与 std::mutex 一起工作,而后者可以和合适的互斥量一起工作,从而加上了 _any 的后缀。因为std::condition_variable_any 更加通用,不过在性能和系统资源的使用方面会有更多的开销,所以通常会将 std::condition_variable 作为首选类型。当对灵活性有要求时,才会考虑 std::condition_variable_any。

下面的代码是简单的生产者-消费者示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
std::mutex mut;
std::queue<data_chunk> data_queue;
std::condition_variable data_cond;

void data_preparation_thread() {
while(more_data_to_prepare()) {
data_chunk const data = prepare_data();
std::lock_guard<std::mutex> lk(mut);
data_queue.push(data);
data_cond.notify_one();
}
}

void data_processing_thread() {
while (true) {
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, []{return !data_queue.empty();});
data_chunk data = data_queue.front();
data_queue.pop();
lk.unlock();
process(data);
if (is_last_chunk(data)) {
break;
}
}
}

首先,队列中中有两个线程,两个线程之间会对数据进行传递。数据准备好时,使用 std::lock_guard 锁定队列,将准备好的数据压入队列之后,线程会对队列中的数据上锁,并调用 std::condition_variable 的 notify_one() 成员函数,对等待的线程进行通知。另外的一个线程正在处理数据,线程首先对互斥量上锁。之后会调用 std::condition_variable 的成员函数 wait(),传递一个锁和一个 lambda 表达式。

wait() 会通过 lambda 函数去检查这些条件,当条件满足(lambda 函数返回 true)时返回并继续持有锁。如果条件不满足(lambda 函数返回 false),wait() 将解锁互斥量,并且将线程(处理数据的线程)置于阻塞或等待状态。当准备数据的线程调用 notify_one() 通知条件变量时,处理数据的线程从睡眠中苏醒,重新获取互斥锁,并且再次进行条件检查。在条件满足的情况下,从 wait() 返回并继续持有锁。当条件不满足时,线程将对互斥量解锁,并重新等待。这就是为什么用 std::unique_lock 而不使用 std::lock_guard 的原因——等待中的线程必须在等待期间解锁互斥量,并对互斥量再次上锁,而 std::lock_guard 没有这么灵活。如果互斥量在线程休眠期间保持锁住状态,准备数据的线程将无法锁住互斥量,也无法添加数据到队列中。同样,等待线程也永远不会知道条件何时满足。

虚假唤醒(spurious wakeup)

虚假唤醒的概念:多个线程同时等待同一个条件满足时,当条件变量唤醒线程时,可能会唤醒多个线程,但是如果对应的资源只有一个线程能获得,其余线程就无法获得该资源,因此其余线程的唤醒是无意义的(有时甚至是有危害的),其余线程的唤醒则被称为虚假唤醒。

上述的危害是指条件变量即使没有被 notify,它也可能被允许返回。这是由于为了给操作系统提供处理错误情况和(线程)竞争实现(更大的)灵活性。幸运的是,linux 中的 pthread 则保证这种情况不会发生(https://en.wikipedia.org/wiki/Spurious_wakeup)。

构建线程安全队列

忽略掉类的构造、赋值以及交换操作,队列还有三组操作:

  1. 对整个队列的状态进行查询:empty() 和 size()
  2. 查询在队列中的各个元素:front() 和 back()
  3. 修改队列的操作:push()、pop() 和 emplace()

和之前提到的栈一样,也会遇到接口上的条件竞争。因此,需要将 front() 和 pop()合并成一个函数调用,就像之前在栈实现时合并 top() 和 pop()一样。当队列在多个线程中传递数据时,接收线程通常需要等待数据的压入。这里提供 pop() 函数的两个变种:try_pop() 和 wait_and_pop()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>

template<typename T>
class threadsafe_queue {
private:
// 因为锁住互斥量是个可变操作,所以互斥量成员必须为 mutable 才能进行上锁
mutable std::mutex mut;
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue() {}
threadsafe_queue(threadsafe_queue const& other) {
std::lock_guard<std::mutex> lk(other.mut);
data_queue=other.data_queue;
}

void push(T new_value) {
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}

void wait_and_pop(T& value) {
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this]{return !data_queue.empty();});
value=data_queue.front();
data_queue.pop();
}

std::shared_ptr<T> wait_and_pop() {
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this]{return !data_queue.empty();});
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}

bool try_pop(T& value) {
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty()) {
return false;
}
value = data_queue.front();
data_queue.pop();
return true;
}

std::shared_ptr<T> try_pop() {
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty()) {
return std::shared_ptr<T>();
}
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}

bool empty() const {
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};

std::future

std::async

假设有一个需要长时间的运算,需要计算出一个有效值,但并不迫切需要这个值。你可以启动新线程来执行这个计算,你需要计算的结果,但是又无法获取 std::thread 执行的任务的返回值。这里就需要 std::async 函数模板。

当不着急让任务结果时,可以使用 std::async 启动一个异步任务。与 std::thread 对象等待的方式不同,std::async 会返回一个 std::future 对象,这个对象持有最终计算出来的结果。当需要这个值时,只需要调用这个对象的 get() 成员函数,就会阻塞线程直到 future 为就绪为止,并返回计算结果。

1
2
3
4
5
6
7
8
9
10
11
#include <future>
#include <iostream>

int find_the_answer_to_ltuae();
void do_other_stuff();

int main() {
std::future<int> the_answer = std::async(find_the_answer_to_ltuae);
do_other_stuff();
std::cout << "The answer is " << the_answer.get() << std::endl;
}

我们还可以在函数调用之前向 std::async 传递一个额外参数,这个参数的类型是 std::launch,是函数的启动策略,它控制 std::async 的异步行为。有三种不同的启动策略:

  1. std::launch::async:保证异步行为,即函数必须在单独的线程中执行。
  2. std::launch::deferred:函数调用延迟到其他线程调用 wait() 或 get() 时才同步执行。
  3. std::launch::async | std::launch::deferred:默认行为,有了这个启动策略,它可以异步运行或不运行,这取决于系统的负载,即有可能创建一个线程运行函数,也有可能不运行直到有线程线程调用 wait() 或 get()。

使用 std::async 会将算法分割到各个任务中,这样程序就能并发了。不过,这不是让 std::future 与任务实例相关联的唯一方式,也可以将任务包装入 std::packaged_task<> 中,或通过编写代码的方式,使用 std::promise<> 模板显式设置值。与 std::promise<> 相比,std::packaged_task<> 具有更高的抽象,所以我们从“高抽象”模板说起。

std::promise

其作用是在一个线程 t1 中保存一个类型 typename T 的值,可供相绑定的 std::future 对象在另一线程 t2 中获取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <iostream>
#include <future>
#include <chrono>

void ThreadFun1(std::promise<int> &p) {
int iVal = 233;
p.set_value(iVal);
}

void ThreadFun2(std::future<int> &f) {
// 阻塞函数,直到收到相关联的 std::promise 对象传入的数据
auto iVal = f.get();
std::cout << "收到数据(int):" << iVal << std::endl;
}

int main() {
std::promise<int> pr1;
std::future<int> fu1 = pr1.get_future();

std::thread t1(ThreadFun1, std::ref(pr1));
std::thread t2(ThreadFun2, std::ref(fu1));

t1.join();
t2.join();
}

std::packaged_task

其实 std::packaged_task 和 std::promise 非常相似,简单来说 std::packaged_task 是对 std::promise<T=std::function> 中 T=std::function 这一可调对象进行了包装,简化了使用方法。并将这一可调对象的返回结果传递给关联的 std::future 对象。

例子 1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <iostream>
#include <future>
#include <chrono>
#include <functional>

int TestFun(int a, int b, int &c) {
std::this_thread::sleep_for(std::chrono::seconds(5));
c = a + b + 230;
return c;
}

int main() {
std::packaged_task<int(int, int, int&)> pt1(TestFun);
std::future<int> fu1 = pt1.get_future();

int c = 0;
std::thread t1(std::move(pt1), 1, 2, std::ref(c));

// 阻塞至线程 t1 结束(函数 TestFun 返回结果)
int iResult = fu1.get();

return 0;
}

例子 2

很多图形架构需要特定的线程去更新界面,所以当线程对界面更新时,需要发出一条信息给正确的线程,让相应的线程来做界面更新。std::packaged_task 提供了这种功能,且不需要发送一条自定义信息给图形界面线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#include <deque>
#include <mutex>
#include <future>
#include <thread>
#include <utility>

std::mutex m;
std::deque<std::packaged_task<void()> > tasks;

bool gui_shutdown_message_received();
void get_and_process_gui_message();

void gui_thread() {
// 图形界面线程循环直到收到一条关闭图形界面的信息后关闭界面
while(!gui_shutdown_message_received()) {
// 关闭界面前,进行轮询界面消息处理,例如:用户点击和执行在队列中的任务
get_and_process_gui_message();
std::packaged_task<void()> task;
{
std::lock_guard<std::mutex> lk(m);
// 当队列中没有任务时,循环将继续
if(tasks.empty()) {
continue;
}
// 在队列中提取出一个任务,释放队列上的锁
task = std::move(tasks.front());
tasks.pop_front();
}
// 执行任务
task();
}
}

std::thread gui_bg_thread(gui_thread);

template<typename Func>
std::future<void> post_task_for_gui_thread(Func f) {
// 提供一个打包好的任务
std::packaged_task<void()> task(f);
// 调用 get_future() 成员函数获取 future 对象
std::future<void> res = task.get_future();
std::lock_guard<std::mutex> lk(m);
// 将任务推入列表
tasks.push_back(std::move(task));
// 返回 future 对象
return res;
}

std::shared_future

std::shared_future 可以让多个线程等待同一个事件。因为 std::future 是只移动的,所以其所有权可以在不同的实例中互相传递,但只有一个实例可以获得特定的同步结果,而 std::shared_future 实例是可拷贝的,所以多个对象可以引用同一关联期望值的结果。

每一个 std::shared_future 的独立对象上,成员函数调用返回的结果还是不同步的,所以为了在多个线程访问一个独立对象时避免数据竞争,必须使用锁来对访问进行保护。优先使用的办法:可以让每个线程都拥有自己对应的拷贝对象。这样,当每个线程都通过自己拥有的 std::shared_future 对象获取结果,那么多个线程访问共享同步结果就是安全的。

可能会使用 std::shared_future 的场景:复杂的电子表格的并行执行。每一个单元格有唯一终值,终值通过公式计算得到,这个终值可能依赖其他单元格的终值。可以使用 std::shared_future 对象引用上一个单元格的数据。当每个单元格内的所有公式并行执行后,任务会以期望的方式完成工作。当其中有计算需要依赖其他单元格的值时就会阻塞,直到依赖单元格的数据准备就绪。这可以让系统在最大程度上使用硬件并发。

简化代码

函数式编程

函数式编程(functional programming)是一种编程方式,函数结果只依赖于传入函数的参数。使用相同的参数调用函数,不管多少次都会获得相同的结果。C++ 标准库中与数学相关的函数都有这个特性。纯粹的函数不会改变任何外部状态,并且这种特性限制了函数的返回值。不修改共享数据,就不存在条件竞争,并且没有必要使用互斥量保护共享数据。这是对编程极大的简化,例如 Haskell 语言中所有函数默认都是“纯粹的”。

命令式编程与函数式编程

函数式编程与命令式编程最大的不同其实在于:函数式编程关心数据的映射,命令式编程关心解决问题的步骤。

以二叉树反转为例,命令式编程的代码是这样的:

1
2
3
4
5
6
def invertTree(root):
if root is None:
return None
root.left = invertTree(root.left)
root.right = invertTree(root.right)
return root

它的含义是:首先判断节点是否为空;然后翻转左树;然后翻转右树;最后左右互换。这就是命令式编程——你要做什么事情,你得把达到目的的步骤详细的描述出来,然后交给机器去运行。

函数式思维提供了另一种思维的途径——所谓“翻转二叉树”,可以看做是要得到一颗和原来二叉树对称的新二叉树,这颗新二叉树的特点是每一个节点都递归地和原树相反。

1
2
3
4
5
def invert(node):
if node is None:
return None
else
return Tree(node.value, invert(node.right), invert(node.left))

这段代码最终达到的目的同样是翻转二叉树,但是它得到结果的方式和命令式的代码有着本质的差别:通过描述一个旧树->新树的映射,而不是描述“旧树得到新树应该怎样做”来达到目的。

快速排序:FP 模式版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
template<typename T>
std::list<T> sequential_quick_sort(std::list<T> input) {
if(input.empty()) {
return input;
}

// 将输入的首个元素放入结果列表中
std::list<T> result;
result.splice(result.begin(), input, input.begin());
T const& pivot = *result.begin();

// 将序列中的值分成小于“中间”值的组和大于“中间”值的组
auto divide_point = std::partition(input.begin(), input.end(),
[&](T const& t){ return t < pivot; });

// 选择了 FP 模式的接口,要使用递归对两部分排序,所以需要创建两个列表
// 将 input 列表小于 divided_point 的值移动到新列表 lower_part 中,其他数继续留在 input 列表中
std::list<T> lower_part;
lower_part.splice(lower_part.end(), input, input.begin(), divide_point);

auto new_lower = sequential_quick_sort(std::move(lower_part)));
auto new_higher = sequential_quick_sort(std::move(input)));

result.splice(result.end(), new_higher);
result.splice(result.begin(), new_lower);
return result;
}

注:

  • void splice(iterator position, list<T> &x, iterator it); 只会把 it 的值剪接到要操作的 list 对象中
  • void splice(iterator position, list<T> &x, iterator first, iterator last); 把 first 到 last 剪接到要操作的 list 对象中
  • std::partition 会对列表进行重置,并返回指向首元素(中间值)的迭代器

快速排序:FP 模式并行版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
template<typename T>
std::list<T> parallel_quick_sort(std::list<T> input) {
if(input.empty()) {
return input;
}

std::list<T> result;
result.splice(result.begin(), input, input.begin());
T const& pivot = *result.begin();

auto divide_point = std::partition(input.begin(), input.end(),
[&](T const& t){ return t < pivot; });

std::list<T> lower_part;
lower_part.splice(lower_part.end(), input, input.begin(), divide_point);

// 当前线程不对小于中间值部分的列表进行排序,std::async 会使用另一线程对列表进行排序
std::future<std::list<T>> new_lower = std::async(&parallel_quick_sort<T>, std::move(lower_part));

// 大于部分如同之前一样,使用递归进行排序
auto new_higher = parallel_quick_sort(std::move(input));

result.splice(result.end(), new_higher);
result.splice(result.begin(), new_lower.get());
return result;
}

通过递归调用 parallel_quick_sort(),可以使用硬件并发。std::async 会启动一个新线程,这样当递归三次时,就会有八个线程在运行了。当递归十次,如果硬件能处理这十次递归调用,将会创建 1024 个执行线程。当运行库认为产生了太多的任务时(也许是因为数量超过了硬件并发的最大值),为了避免任务传递的开销,这些任务会在使用 get() 获取结果的线程上运行,而不是在新线程上运行,这也符合 std::async 的行为。

函数化编程可算作是并发编程的范型,并且也是通讯顺序进程(CSP,Communicating Sequential Processer)的范型,这里的线程没有共享数据,但有通讯通道允许信息在不同线程间进行传递。这种范型被 Erlang 语言所采纳,并且常用在 MPI(Message Passing Interface)上做高性能运算。

参与者模式

在参与者模式中,系统中有很多独立的(运行在一个独立的线程上)参与者,这些参与者会互相发送信息,去执行手头上的任务,并且不会共享状态,除非是通过信息直接传入的。一个并发系统中,这种编程方式可以极大的简化任务的设计,因为每一个线程都完全被独立对待。因此,使用多线程去分离关注点时,需要明确线程之间的任务应该如何分配。

参考

[C++11]std::promise介绍及使用
[C++11]std::packaged_task介绍及使用
什么是函数式编程思维?

04 同步操作 | 《C++ Concurrency In Action》笔记

http://www.zh0ngtian.tech/posts/9db99808.html

作者

zhongtian

发布于

2021-03-17

更新于

2023-12-16

许可协议

评论