08 并发设计 | 《C++ Concurrency In Action》笔记

本系列笔记参考《C++ Concurrency in Action, 2nd Edition》及其中文翻译

线程间划分工作

递归划分

快速排序有两个最基本的步骤:将数据划分到中枢元素之前或之后,然后对中枢元素之前和之后的两半数组再次进行快速排序。这里不能通过对数据的简单划分达到并行,因为只有在一次排序结束后,才能知道哪些项在中枢元素之前和之后。当要对这种算法进行并行化,很自然的会想到使用递归。每一级的递归都会多次调用 quick_sort 函数,因为需要知道哪些元素在中枢元素之前和之后。递归调用是完全独立的,因为其访问的是不同的数据集,并且每次迭代都能并发执行。

对一个很大的数据集进行排序时,每层递归都产生新线程,最后就会产生大量的线程。大量线程会对性能有影响,如果线程太多,应用将会运行的很慢。如果数据集过于庞大,会将资源耗尽。所以在递归的基础上进行任务的划分,就是一个不错的主意。只需要将一定数量的数据打包后,交给线程即可。std::async() 可以处理这种简单的情况。

另一种选择是使用 std::thread::hardware_concurrency() 函数来固定线程的数量。将已排序的数据推到线程安全的栈上。线程无所事事时,不是已经完成对自己数据块的梳理,就是在等待一组排序数据的产生。所以,线程可以从栈上获取这组数据,并且对其排序。下面的代码展示了使用栈的并行快速排序算法——等待数据块排序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
template <typename T>
struct chunk_to_sort {
std::list<T> data;
std::promise<std::list<T>> promise;
};

template <typename T>
struct sorter {
thread_safe_stack<chunk_to_sort<T>> chunks; // 支持在栈上简单的存储无序数据块
std::vector<std::thread> threads; // 支持对线程进行设置
unsigned const max_thread_count;
std::atomic<bool> end_of_data;

sorter()
: max_thread_count(std::thread::hardware_concurrency() - 1),
end_of_data(false) {}

~sorter() {
end_of_data = true;
for (unsigned i = 0; i < threads.size(); ++i) {
threads[i].join();
}
}

void try_sort_chunk() {
std::shared_ptr<chunk_to_sort<T>> chunk = chunks.pop();
if (chunk) {
sort_chunk(chunk);
}
}

void sort_chunk(std::shared_ptr<chunk_to_sort<T>> const& chunk) {
// 将结果存在 promise 中,让线程对已存在于栈上的数据块进行提取
chunk->promise.set_value(do_sort(chunk->data));
}

std::list<T> do_sort(std::list<T>& chunk_data) {
if (chunk_data.empty()) {
return chunk_data;
}

std::list<T> result;
// 将 chunk_data 中的由迭代器 chunk_data.begin() 指向的元素移到 result.begin() 处
result.splice(result.begin(), chunk_data, chunk_data.begin());
T const& partition_val = *result.begin();

typename std::list<T>::iterator divide_point =
std::partition(chunk_data.begin(), chunk_data.end(),
[&](T const& val) { return val < partition_val; });

chunk_to_sort<T> new_lower_chunk;
// 将 chunk_data.begin() 和 divide_point 之间的数据移到 new_lower_chunk.data.end() 处
new_lower_chunk.data.splice(new_lower_chunk.data.end(), chunk_data,
chunk_data.begin(), divide_point);

std::future<std::list<T>> new_lower = new_lower_chunk.promise.get_future();
chunks.push(std::move(new_lower_chunk)); // 将数据块推到栈
if (threads.size() < max_thread_count) { // 产生新线程对栈中数据进行排序
threads.push_back(std::thread(&sorter<T>::sort_thread, this));
}

// 对 new_higher 进行排序
std::list<T> new_higher(do_sort(chunk_data));

// 将 new_higher 指向的数据移到 result.end() 处
result.splice(result.end(), new_higher);

// 小于部分的数据块由其他线程进行处理,需要等待线程完成
while (new_lower.wait_for(std::chrono::seconds(0)) !=
std::future_status::ready) {
try_sort_chunk();
}

// 将 new_lower 指向的数据移到 result.begin() 处
result.splice(result.begin(), new_lower.get());
return result;
}

void sort_thread() {
while (!end_of_data) {
try_sort_chunk();
std::this_thread::yield();
}
}
};

template <typename T>
std::list<T> parallel_quick_sort(std::list<T> input) {
if (input.empty()) {
return input;
}
sorter<T> s;

return s.do_sort(input);
}

这个方案使用到了特殊的线程池——所有线程任务都来源于一个等待链表,然后线程会去完成任务,任务完成后会再来链表提取任务。这个方案中,不会递归产生无数线程,也不用再依赖 C++ 线程库,这里可以选择执行线程的数量。

任务几种划分方法:处理前划分和递归划分(都需要事先知道数据的长度固定),还有上面的划分方式。事情并非总是这样好解决,当数据是动态生成或是通过外部输入,那这里的办法就不适用了。这种情况下,基于任务类型的划分方式,就要好于基于数据的划分方式。

通过任务类型划分

虽然会为每个线程分配不同的数据块,因为这里每个线程对每个数据块的操作是相同的。另一种选择是让线程做专门的工作,就是每个线程做不同的工作。线程可能会对同一段数据进行操作,但对数据进行不同的操作。

对分工的排序,也就是分离关注点。每个线程都有不同的任务,这意味着真正意义上的线程独立。其他线程偶尔会向特定线程交付数据,或是通过触发事件的方式来进行处理。不过总体而言,每个线程只需要关注自己所要做的事情即可。其本身就是良好的设计,每一段代码只对自己的部分负责。

分离关注

当有多个任务需要持续运行一段时间,或需要及时进行处理的事件(比如,按键事件或传入网络数据),且还有其他任务正在运行时,单线程应用采用的是单责任原则处理冲突。单线程中代码会执行任务 A 后,再去执行任务 B,再检查按钮事件,再检查传入的网络包,然后在循环回去,执行任务 A。这将会使得任务 A 复杂化,因为需要存储完成状态,以及定期从主循环中返回。如果在循环中添加了很多任务,那么程序将运行的很慢。并且用户会发现,在按下按键后,很久之后才会有反应。

当使用独立线程执行任务时,操作系统会帮忙处理接口问题。执行任务 A 时,线程可以专注于执行任务,而不用为保存状态从主循环中返回。操作系统会自动保存状态,当需要的时候将线程切换到任务 B 或任务 C。如果目标系统是带有多核或多处理器,任务 A 和任务 B 可很可能真正的并发执行。处理按键时间或网络包的代码,就能及时执行了。所有事情都完成的很好,用户得到了及时的响应。当然,作为开发者只需要写具体操作的代码即可,不用将控制分支和用户交互混在一起了。

当通过任务类型对线程间的任务进行划分时,不应该让线程处于隔离态。当多个输入数据集需要使用同样的操作序列,可以将序列中的操作分成多个阶段让线程执行。

划分任务序列

当任务会应用到相同操作序列,去处理独立的数据项时,就可以使用流水线模式进行并发。

使用这种方式划分工作,可以为流水线中的每一阶段操作创建一个独立线程。当一个操作完成,数据元素会放在队列中,供下一阶段的线程使用。这就允许第一个线程在完成对于第一个数据块的操作时,第二个线程可以对第一个数据块执行管线中的第二个操作。

并发代码的性能

处理器数量

C++ 标准线程库提供 std::thread::hardware_concurrency(),使用这个函数就能知道在给定硬件上可以扩展的线程数量了。使用 std::thread::hardware_concurrency() 需要谨慎,因为不会考虑其他应用已使用的线程数量(除非已经将系统信息进行共享)。std::async() 可以避免这个问题,标准库会对所有调用进行安排。

数据争用与乒乓缓存

当两个线程在不同处理器上时,对同一数据进行读取,通常不会出现问题。因为数据会拷贝到每个线程的缓存中,并让两个处理器同时进行处理。当有线程对数据进行修改,并且需要更新到其他核芯的缓存中去,就要耗费一定的时间。这样的修改可能会让第二个处理器停下来,等待内存更新缓存中的数据,这是一个特别特别慢的操作。

对于下面这段代码:

1
2
3
4
5
6
std::atomic<unsigned long> counter(0);
void processing_loop() {
while(counter.fetch_add(1, std::memory_order_relaxed) < 100000000) {
do_something();
}
}

counter 变量是全局的,任何线程都能调用 processing_loop()。因此,每次对 counter 进行增量操作时,处理器必须确保缓存中的 counter 是最新值,然后进行修改,再告知其他处理器。fetch_add 是一个“读-改-写”操作,因此要对最新的值进行检索。如果另一个线程在另一个处理器上执行同样的代码,counter 的数据需要在两个处理器之间进行传递。如果 do_something() 足够短,或有很多处理器来对这段代码进行处理时,处理器会互相等待。一个处理器准备更新这个值,另一个处理器在修改这个值,所以该处理器就需要等待第二个处理器更新完成,并且完成更新传递时才能执行更新,这种情况被称为高竞争(high contention),如果处理器很少需要互相等待就是低竞争(low contention)。

循环中 counter 的数据将在每个缓存中传递若干次,这就是乒乓缓存(cache ping-pong),这会对应用的性能有着重大的影响。当处理器因为等待缓存转移而停止运行时,这个处理器就不能做任何事情。

如何避免乒乓缓存呢?答案就是:减少两个线程对同一个内存位置的竞争。

虽然,要实现起来并不简单。即使给定内存位置,因为伪共享(false sharing)可能还是会有乒乓缓存。

伪共享

处理器处理缓存的单位是缓存行(cache lines),如果两个数据在内存上距离比较近,那么它们就有可能位于同一个缓存行。在这种情况下,即使多线程同时访问不同的内存地址,也有可能出现乒乓缓存。

C++17 标准在头文件 中定义了 std::hardware_destructive_interference_size,它指定了当前编译目标可能共享的连续字节的最大数目。如果确保数据间隔大于等于这个字节数,就不会有错误的共享存在了。

在实践中设计并发代码

并行版 std::for_each

std::for_each 的原理很简单:其对某个范围中的元素,依次使用用户提供的函数。不需要返回结果,要将异常传递给调用者,需要使用 std::packaged_task 和 std::future 对线程中的异常进行转移。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class join_threads {
std::vector<std::thread>& threads;

public:
explicit join_threads(std::vector<std::thread>& threads_) : threads(threads_) {}
~join_threads() {
for (unsigned long i = 0; i < threads.size(); ++i) {
if (threads[i].joinable()) threads[i].join();
}
}
};

template <typename Iterator, typename Func>
void parallel_for_each(Iterator first, Iterator last, Func f) {
unsigned long const length = std::distance(first, last);

if (!length) {
return;mtle
}

unsigned long const min_per_thread = 25;
unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;
unsigned long const hardware_threads = std::thread::hardware_concurrency();
unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;

// future 向量对 std::future<void> 类型变量进行存储,因为工作线程不会返回值
std::vector<std::future<void>> futures(num_threads - 1);

std::vector<std::thread> threads(num_threads - 1);
join_threads joiner(threads);

Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); ++i) {
Iterator block_end = block_start;
std::advance(block_end, block_size);
std::packaged_task<void(void)> task([=]() { std::for_each(block_start, block_end, f); });
futures[i] = task.get_future();
threads[i] = std::thread(std::move(task));
block_start = block_end;
}
std::for_each(block_start, last, f);
for (unsigned long i = 0; i < (num_threads - 1); ++i) {
// 工作线程不需要返回值时,这一步只为了检索工作线程的异常。如果不想把异常传递出去,可以省略这一步
futures[i].get();
}
}

实现并行 std::accumulate 时,使用 std::async 会简化代码。同样,parallel_for_each 也可以使用 std::async。因为不知道需要使用多少个线程,所以在运行时对数据进行迭代划分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
template <typename Iterator, typename Func>
void parallel_for_each(Iterator first, Iterator last, Func f) {
unsigned long const length = std::distance(first, last);

if (!length) return;

unsigned long const min_per_thread = 25;

if (length < (2 * min_per_thread)) {
std::for_each(first, last, f); // 1
} else {
Iterator const mid_point = first + length / 2;
// 将每一级的数据分成两部分,异步执行另外一部分
std::future<void> first_half = std::async(&parallel_for_each<Iterator, Func>, first, mid_point, f);
parallel_for_each(mid_point, last, f);
first_half.get(); // 传播异常
}
}

并行版 std::find

因为不需要对数据元素做处理,所以当第一个元素就满足查找标准时,就没有必要对剩余元素进行搜索了。中断其他线程的一个办法就是使用原子变量作为标识,处理过每一个元素后就对这个标识进行检查。如果标识设置了,就意味有线程找到了匹配元素,算法就可以停止并返回。用这种方式来中断线程就可以将没有处理的数据保持原样,并在更多的情况下,比串行运行性能上能提升很多。缺点就是,加载原子变量是一个很慢的操作,会阻碍线程的运行。

获取返回值和异常有两个选择:

  • 使用一个 future 数组,使用 std::packaged_task 来转移值和异常,主线程上对返回值和异常进行处理
  • 使用 std::promise 对工作线程上的最终结果直接进行设置

这完全依赖于怎么样处理工作线程上的异常:如果想当第一个异常出现时停止(即使还没有对所有元素进行处理),就可以使用 std::promise 对异常和最终值进行设置;如果想让其他工作线程继续查找,可以使用std::packaged_task 来存储异常,当线程没有找到匹配元素时异常将再次抛出。一般情况下会选择前者,因为其行为和 std::find 更为接近。

对于下面的代码,当线程调用 find_element 查询一个值,或者抛出一个异常时,如果其他线程看到 done_flag 已设置,其他线程会终止。如果多线程同时找到匹配值或抛出异常,将会产生竞争。不过,这是良性的条件竞争,成功的竞争者会作为“第一个”返回线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
template <typename Iterator, typename MatchType>
Iterator parallel_find(Iterator first, Iterator last, MatchType match) {
struct find_element {
void operator()(Iterator begin, Iterator end, MatchType match,
std::promise<Iterator>* result,
std::atomic<bool>* done_flag) {
try {
// 循环通过在给定数据块中的元素,检查每一步的标识
for (; (begin != end) && !done_flag->load(); ++begin) {
if (*begin == match) {
// 如果匹配的元素被找到,就将最终的结果设置到 promise 当中,并且在返回前对 done_flag 进行设置
result->set_value(begin);
done_flag->store(true);
return;
}
}
} catch (...) {
try {
result->set_exception(std::current_exception());
done_flag->store(true);
} catch (...) {
}
}
}
};

unsigned long const length = std::distance(first, last);

if (!length) {
return last;
}

unsigned long const min_per_thread = 25;
unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;

unsigned long const hardware_threads = std::thread::hardware_concurrency();

unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);

unsigned long const block_size = length / num_threads;

std::promise<Iterator> result;
std::atomic<bool> done_flag(false);
std::vector<std::thread> threads(num_threads - 1);
{
join_threads joiner(threads);

Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); ++i) {
Iterator block_end = block_start;
std::advance(block_end, block_size);
// 随着对元素的查找,promise 和标识会传递到新线程中
threads[i] = std::thread(find_element(), block_start, block_end, match, &result, &done_flag);
block_start = block_end;
}
// 主线程也使用 find_element 对剩下的元素进行查找
find_element()(block_start, last, match, &result, &done_flag);
}

// 如果没有找到且没有出现异常,会将最后一个元素进行返回
if (!done_flag.load()) {
return last;
}

// 如果找到匹配元素或出现异常,就可以调用 std::future<Iterator>.get() 来获取返回值或异常
return result.get_future().get();
}

不过,假设使用硬件上所有可用的的线程,或使用其他机制对线程上的任务进行提前划分,可以使用 std::async 以及递归数据划分的方式来简化实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
template <typename Iterator, typename MatchType>
// done 表示匹配项已经找到,该标示可以在线程间传递
Iterator parallel_find_impl(Iterator first, Iterator last, MatchType match, std::atomic<bool>& done) {
try {
unsigned long const length = std::distance(first, last);
unsigned long const min_per_thread = 25;
if (length < (2 * min_per_thread)) {
// 如果数据块大小不足以分成两半,就要让当前线程完成所有的工作
for (; (first != last) && !done.load(); ++first) {
if (*first == match) {
done = true;
return first;
}
}
return last;
} else {
Iterator const mid_point = first + (length / 2);
std::future<Iterator> async_result =
std::async(&parallel_find_impl<Iterator, MatchType>, mid_point, last, match, std::ref(done));
Iterator const direct_result = parallel_find_impl(first, mid_point, match, done);
// 如果查找返回的是 mid_point,这就意味着没有找到匹配项,所以就要从异步查找中获取结果
// 如果在另一半中没有匹配项的话,返回的结果就一定是 last,这个值的返回就代表了没有找到匹配的元素
return (direct_result == mid_point) ? async_result.get() : direct_result;
}
} catch (...) {
done = true;
throw;
}
}

template <typename Iterator, typename MatchType>
Iterator parallel_find(Iterator first, Iterator last, MatchType match) {
std::atomic<bool> done(false);
return parallel_find_impl(first, last, match, done);
}

并行版 std::partial_sum

08 并发设计 | 《C++ Concurrency In Action》笔记

http://www.zh0ngtian.tech/posts/3fed003.html

作者

zhongtian

发布于

2021-06-06

更新于

2023-12-16

许可协议

评论