本系列笔记参考《C++ Concurrency in Action, 2nd Edition》 及其中文翻译 。
线程间划分工作 递归划分 快速排序有两个最基本的步骤:将数据划分到中枢元素之前或之后,然后对中枢元素之前和之后的两半数组再次进行快速排序。这里不能通过对数据的简单划分达到并行,因为只有在一次排序结束后,才能知道哪些项在中枢元素之前和之后。当要对这种算法进行并行化,很自然的会想到使用递归。每一级的递归都会多次调用 quick_sort 函数,因为需要知道哪些元素在中枢元素之前和之后。递归调用是完全独立的,因为其访问的是不同的数据集,并且每次迭代都能并发执行。
对一个很大的数据集进行排序时,每层递归都产生新线程,最后就会产生大量的线程。大量线程会对性能有影响,如果线程太多,应用将会运行的很慢。如果数据集过于庞大,会将资源耗尽。所以在递归的基础上进行任务的划分,就是一个不错的主意。只需要将一定数量的数据打包后,交给线程即可。std::async() 可以处理这种简单的情况。
另一种选择是使用 std::thread::hardware_concurrency() 函数来固定线程的数量。将已排序的数据推到线程安全的栈上。线程无所事事时,不是已经完成对自己数据块的梳理,就是在等待一组排序数据的产生。所以,线程可以从栈上获取这组数据,并且对其排序。下面的代码展示了使用栈的并行快速排序算法——等待数据块排序:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 template <typename T>struct chunk_to_sort { std::list<T> data; std::promise<std::list<T>> promise; }; template <typename T>struct sorter { thread_safe_stack<chunk_to_sort<T>> chunks; std::vector<std::thread> threads; unsigned const max_thread_count; std::atomic<bool > end_of_data; sorter () : max_thread_count (std::thread::hardware_concurrency () - 1 ), end_of_data (false ) {} ~sorter () { end_of_data = true ; for (unsigned i = 0 ; i < threads.size (); ++i) { threads[i].join (); } } void try_sort_chunk () { std::shared_ptr<chunk_to_sort<T>> chunk = chunks.pop (); if (chunk) { sort_chunk (chunk); } } void sort_chunk (std::shared_ptr<chunk_to_sort<T>> const & chunk) { chunk->promise.set_value (do_sort (chunk->data)); } std::list<T> do_sort (std::list<T>& chunk_data) { if (chunk_data.empty ()) { return chunk_data; } std::list<T> result; result.splice (result.begin (), chunk_data, chunk_data.begin ()); T const & partition_val = *result.begin (); typename std::list<T>::iterator divide_point = std::partition (chunk_data.begin (), chunk_data.end (), [&](T const & val) { return val < partition_val; }); chunk_to_sort<T> new_lower_chunk; new_lower_chunk.data.splice (new_lower_chunk.data.end (), chunk_data, chunk_data.begin (), divide_point); std::future<std::list<T>> new_lower = new_lower_chunk.promise.get_future (); chunks.push (std::move (new_lower_chunk)); if (threads.size () < max_thread_count) { threads.push_back (std::thread (&sorter<T>::sort_thread, this )); } std::list<T> new_higher (do_sort(chunk_data)) ; result.splice (result.end (), new_higher); while (new_lower.wait_for (std::chrono::seconds (0 )) != std::future_status::ready) { try_sort_chunk (); } result.splice (result.begin (), new_lower.get ()); return result; } void sort_thread () { while (!end_of_data) { try_sort_chunk (); std::this_thread::yield (); } } }; template <typename T>std::list<T> parallel_quick_sort (std::list<T> input) { if (input.empty ()) { return input; } sorter<T> s; return s.do_sort (input); }
这个方案使用到了特殊的线程池——所有线程任务都来源于一个等待链表,然后线程会去完成任务,任务完成后会再来链表提取任务。这个方案中,不会递归产生无数线程,也不用再依赖 C++ 线程库,这里可以选择执行线程的数量。
任务几种划分方法:处理前划分和递归划分(都需要事先知道数据的长度固定),还有上面的划分方式。事情并非总是这样好解决,当数据是动态生成或是通过外部输入,那这里的办法就不适用了。这种情况下,基于任务类型的划分方式,就要好于基于数据的划分方式。
通过任务类型划分 虽然会为每个线程分配不同的数据块,因为这里每个线程对每个数据块的操作是相同的。另一种选择是让线程做专门的工作,就是每个线程做不同的工作。线程可能会对同一段数据进行操作,但对数据进行不同的操作。
对分工的排序,也就是分离关注点。每个线程都有不同的任务,这意味着真正意义上的线程独立。其他线程偶尔会向特定线程交付数据,或是通过触发事件的方式来进行处理。不过总体而言,每个线程只需要关注自己所要做的事情即可。其本身就是良好的设计,每一段代码只对自己的部分负责。
分离关注
当有多个任务需要持续运行一段时间,或需要及时进行处理的事件(比如,按键事件或传入网络数据),且还有其他任务正在运行时,单线程应用采用的是单责任原则处理冲突。单线程中代码会执行任务 A 后,再去执行任务 B,再检查按钮事件,再检查传入的网络包,然后在循环回去,执行任务 A。这将会使得任务 A 复杂化,因为需要存储完成状态,以及定期从主循环中返回。如果在循环中添加了很多任务,那么程序将运行的很慢。并且用户会发现,在按下按键后,很久之后才会有反应。
当使用独立线程执行任务时,操作系统会帮忙处理接口问题。执行任务 A 时,线程可以专注于执行任务,而不用为保存状态从主循环中返回。操作系统会自动保存状态,当需要的时候将线程切换到任务 B 或任务 C。如果目标系统是带有多核或多处理器,任务 A 和任务 B 可很可能真正的并发执行。处理按键时间或网络包的代码,就能及时执行了。所有事情都完成的很好,用户得到了及时的响应。当然,作为开发者只需要写具体操作的代码即可,不用将控制分支和用户交互混在一起了。
当通过任务类型对线程间的任务进行划分时,不应该让线程处于隔离态。当多个输入数据集需要使用同样的操作序列,可以将序列中的操作分成多个阶段让线程执行。
划分任务序列
当任务会应用到相同操作序列,去处理独立的数据项时,就可以使用流水线模式进行并发。
使用这种方式划分工作,可以为流水线中的每一阶段操作创建一个独立线程。当一个操作完成,数据元素会放在队列中,供下一阶段的线程使用。这就允许第一个线程在完成对于第一个数据块的操作时,第二个线程可以对第一个数据块执行管线中的第二个操作。
并发代码的性能 处理器数量 C++ 标准线程库提供 std::thread::hardware_concurrency(),使用这个函数就能知道在给定硬件上可以扩展的线程数量了。使用 std::thread::hardware_concurrency() 需要谨慎,因为不会考虑其他应用已使用的线程数量(除非已经将系统信息进行共享)。std::async() 可以避免这个问题,标准库会对所有调用进行安排。
数据争用与乒乓缓存 当两个线程在不同处理器上时,对同一数据进行读取,通常不会出现问题。因为数据会拷贝到每个线程的缓存中,并让两个处理器同时进行处理。当有线程对数据进行修改,并且需要更新到其他核芯的缓存中去,就要耗费一定的时间。这样的修改可能会让第二个处理器停下来,等待内存更新缓存中的数据,这是一个特别特别慢的操作。
对于下面这段代码:
1 2 3 4 5 6 std::atomic<unsigned long > counter (0 ) ;void processing_loop () { while (counter.fetch_add (1 , std::memory_order_relaxed) < 100000000 ) { do_something (); } }
counter 变量是全局的,任何线程都能调用 processing_loop()。因此,每次对 counter 进行增量操作时,处理器必须确保缓存中的 counter 是最新值,然后进行修改,再告知其他处理器。fetch_add 是一个“读-改-写”操作,因此要对最新的值进行检索。如果另一个线程在另一个处理器上执行同样的代码,counter 的数据需要在两个处理器之间进行传递。如果 do_something() 足够短,或有很多处理器来对这段代码进行处理时,处理器会互相等待。一个处理器准备更新这个值,另一个处理器在修改这个值,所以该处理器就需要等待第二个处理器更新完成,并且完成更新传递时才能执行更新,这种情况被称为高竞争(high contention),如果处理器很少需要互相等待就是低竞争(low contention)。
循环中 counter 的数据将在每个缓存中传递若干次,这就是乒乓缓存(cache ping-pong),这会对应用的性能有着重大的影响。当处理器因为等待缓存转移而停止运行时,这个处理器就不能做任何事情。
如何避免乒乓缓存呢?答案就是:减少两个线程对同一个内存位置的竞争。
虽然,要实现起来并不简单。即使给定内存位置,因为伪共享(false sharing)可能还是会有乒乓缓存。
伪共享 处理器处理缓存的单位是缓存行(cache lines),如果两个数据在内存上距离比较近,那么它们就有可能位于同一个缓存行。在这种情况下,即使多线程同时访问不同的内存地址,也有可能出现乒乓缓存。
C++17 标准在头文件 中定义了 std::hardware_destructive_interference_size,它指定了当前编译目标可能共享的连续字节的最大数目。如果确保数据间隔大于等于这个字节数,就不会有错误的共享存在了。
在实践中设计并发代码 并行版 std::for_each std::for_each 的原理很简单:其对某个范围中的元素,依次使用用户提供的函数。不需要返回结果,要将异常传递给调用者,需要使用 std::packaged_task 和 std::future 对线程中的异常进行转移。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 class join_threads { std::vector<std::thread>& threads; public : explicit join_threads (std::vector<std::thread>& threads_) : threads(threads_) { } ~join_threads () { for (unsigned long i = 0 ; i < threads.size (); ++i) { if (threads[i].joinable ()) threads[i].join (); } } }; template <typename Iterator, typename Func>void parallel_for_each (Iterator first, Iterator last, Func f) { unsigned long const length = std::distance (first, last); if (!length) { return ;mtle } unsigned long const min_per_thread = 25 ; unsigned long const max_threads = (length + min_per_thread - 1 ) / min_per_thread; unsigned long const hardware_threads = std::thread::hardware_concurrency (); unsigned long const num_threads = std::min (hardware_threads != 0 ? hardware_threads : 2 , max_threads); unsigned long const block_size = length / num_threads; std::vector<std::future<void >> futures (num_threads - 1 ); std::vector<std::thread> threads (num_threads - 1 ) ; join_threads joiner (threads) ; Iterator block_start = first; for (unsigned long i = 0 ; i < (num_threads - 1 ); ++i) { Iterator block_end = block_start; std::advance (block_end, block_size); std::packaged_task<void (void ) > task ([=]() { std::for_each(block_start, block_end, f); }) ; futures[i] = task.get_future (); threads[i] = std::thread (std::move (task)); block_start = block_end; } std::for_each(block_start, last, f); for (unsigned long i = 0 ; i < (num_threads - 1 ); ++i) { futures[i].get (); } }
实现并行 std::accumulate 时,使用 std::async 会简化代码。同样,parallel_for_each 也可以使用 std::async。因为不知道需要使用多少个线程,所以在运行时对数据进行迭代划分。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 template <typename Iterator, typename Func>void parallel_for_each (Iterator first, Iterator last, Func f) { unsigned long const length = std::distance (first, last); if (!length) return ; unsigned long const min_per_thread = 25 ; if (length < (2 * min_per_thread)) { std::for_each(first, last, f); } else { Iterator const mid_point = first + length / 2 ; std::future<void > first_half = std::async (¶llel_for_each<Iterator, Func>, first, mid_point, f); parallel_for_each (mid_point, last, f); first_half.get (); } }
并行版 std::find 因为不需要对数据元素做处理,所以当第一个元素就满足查找标准时,就没有必要对剩余元素进行搜索了。中断其他线程的一个办法就是使用原子变量作为标识,处理过每一个元素后就对这个标识进行检查。如果标识设置了,就意味有线程找到了匹配元素,算法就可以停止并返回。用这种方式来中断线程就可以将没有处理的数据保持原样,并在更多的情况下,比串行运行性能上能提升很多。缺点就是,加载原子变量是一个很慢的操作,会阻碍线程的运行。
获取返回值和异常有两个选择:
使用一个 future 数组,使用 std::packaged_task 来转移值和异常,主线程上对返回值和异常进行处理
使用 std::promise 对工作线程上的最终结果直接进行设置
这完全依赖于怎么样处理工作线程上的异常:如果想当第一个异常出现时停止(即使还没有对所有元素进行处理),就可以使用 std::promise 对异常和最终值进行设置;如果想让其他工作线程继续查找,可以使用std::packaged_task 来存储异常,当线程没有找到匹配元素时异常将再次抛出。一般情况下会选择前者,因为其行为和 std::find 更为接近。
对于下面的代码,当线程调用 find_element 查询一个值,或者抛出一个异常时,如果其他线程看到 done_flag 已设置,其他线程会终止。如果多线程同时找到匹配值或抛出异常,将会产生竞争。不过,这是良性的条件竞争,成功的竞争者会作为“第一个”返回线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 template <typename Iterator, typename MatchType>Iterator parallel_find (Iterator first, Iterator last, MatchType match) { struct find_element { void operator () (Iterator begin, Iterator end, MatchType match, std::promise<Iterator>* result, std::atomic<bool >* done_flag) { try { for (; (begin != end) && !done_flag->load (); ++begin) { if (*begin == match) { result->set_value (begin); done_flag->store (true ); return ; } } } catch (...) { try { result->set_exception (std::current_exception ()); done_flag->store (true ); } catch (...) { } } } }; unsigned long const length = std::distance (first, last); if (!length) { return last; } unsigned long const min_per_thread = 25 ; unsigned long const max_threads = (length + min_per_thread - 1 ) / min_per_thread; unsigned long const hardware_threads = std::thread::hardware_concurrency (); unsigned long const num_threads = std::min (hardware_threads != 0 ? hardware_threads : 2 , max_threads); unsigned long const block_size = length / num_threads; std::promise<Iterator> result; std::atomic<bool > done_flag (false ) ; std::vector<std::thread> threads (num_threads - 1 ) ; { join_threads joiner (threads) ; Iterator block_start = first; for (unsigned long i = 0 ; i < (num_threads - 1 ); ++i) { Iterator block_end = block_start; std::advance (block_end, block_size); threads[i] = std::thread (find_element (), block_start, block_end, match, &result, &done_flag); block_start = block_end; } find_element ()(block_start, last, match, &result, &done_flag); } if (!done_flag.load ()) { return last; } return result.get_future ().get (); }
不过,假设使用硬件上所有可用的的线程,或使用其他机制对线程上的任务进行提前划分,可以使用 std::async 以及递归数据划分的方式来简化实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 template <typename Iterator, typename MatchType>Iterator parallel_find_impl (Iterator first, Iterator last, MatchType match, std::atomic<bool >& done) { try { unsigned long const length = std::distance (first, last); unsigned long const min_per_thread = 25 ; if (length < (2 * min_per_thread)) { for (; (first != last) && !done.load (); ++first) { if (*first == match) { done = true ; return first; } } return last; } else { Iterator const mid_point = first + (length / 2 ); std::future<Iterator> async_result = std::async (¶llel_find_impl<Iterator, MatchType>, mid_point, last, match, std::ref (done)); Iterator const direct_result = parallel_find_impl (first, mid_point, match, done); return (direct_result == mid_point) ? async_result.get () : direct_result; } } catch (...) { done = true ; throw ; } } template <typename Iterator, typename MatchType>Iterator parallel_find (Iterator first, Iterator last, MatchType match) { std::atomic<bool > done (false ) ; return parallel_find_impl (first, last, match, done); }
并行版 std::partial_sum