本系列笔记参考《C++ Concurrency in Action, 2nd Edition》及其中文翻译。
线程池
简单的线程池
作为简单的线程池,其拥有固定数量的工作线程(通常工作线程数量与 std::thread::hardware_concurrency()相同)。工作需要完成时,可以调用函数将任务挂在任务队列中。每个工作线程都会从任务队列上获取任务,然后执行这个任务,执行完成后再回来获取新的任务。线程池中线程就不需要等待其他线程完成对应任务了。如果需要等待,就需要对同步进行管理。
下面的代码展示了一个最简单的线程池实现,注意成员声明的顺序很重要:,。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| class thread_pool { std::atomic_bool done; thread_safe_queue<std::function<void()> > work_queue; std::vector<std::thread> threads; join_threads joiner;
void worker_thread() { while (!done) { std::function<void()> task; if (work_queue.try_pop(task)) { task(); } else { std::this_thread::yield(); } } }
public: thread_pool() : done(false), joiner(threads) { unsigned const thread_count = std::thread::hardware_concurrency();
try { for (unsigned i = 0; i < thread_count; ++i) { threads.push_back(std::thread(&thread_pool::worker_thread, this)); } } catch (...) { done = true; throw; } }
~thread_pool() { done = true; }
template <typename FunctionType> void submit(FunctionType f) { work_queue.push(std::function<void()>(f)); } };
|
等待线程池中的任务
一种特殊的情况是,执行任务的线程需要返回结果到主线程上进行处理。本这种情况下,需要用 future 对最终的结果进行转移。下面的代码展示了对简单线程池的修改,通过修改就能等待任务完成,以及在工作线程完成后,返回一个结果到等待线程中去,不过 std::packaged_task<> 实例是不可拷贝的,仅可移动,所以不能再使用 std::function<> 来实现任务队列,因为 std::function<> 需要存储可复制构造的函数对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| class function_wrapper { struct impl_base { virtual void call() = 0; virtual ~impl_base() {} };
std::unique_ptr<impl_base> impl; template <typename F> struct impl_type : impl_base { F f; impl_type(F&& f_) : f(std::move(f_)) {} void call() { f(); } };
public: template <typename F> function_wrapper(F&& f) : impl(new impl_type<F>(std::move(f))) {}
void operator()() { impl->call(); }
function_wrapper() = default;
function_wrapper(function_wrapper&& other) : impl(std::move(other.impl)) {}
function_wrapper& operator=(function_wrapper&& other) { impl = std::move(other.impl); return *this; }
function_wrapper(const function_wrapper&) = delete; function_wrapper(function_wrapper&) = delete; function_wrapper& operator=(const function_wrapper&) = delete; };
class thread_pool { thread_safe_queue<function_wrapper> work_queue;
void worker_thread() { while (!done) { function_wrapper task; if (work_queue.try_pop(task)) { task(); } else { std::this_thread::yield(); } } }
public: using result_type = std::result_of<FunctionType()>::type template <typename FunctionType> std::future<result_type> submit(FunctionType f) { std::packaged_task<result_type()> task(std::move(f)); std::future<result_type> res(task.get_future()); work_queue.push(std::move(task)); return res; } };
|
下面的代码展示了如何让 parallel_accumuate 函数使用线程池:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| template <typename Iterator, typename T> T parallel_accumulate(Iterator first, Iterator last, T init) { unsigned long const length = std::distance(first, last);
if (!length) { return init; }
unsigned long const block_size = 25; unsigned long const num_blocks = (length + block_size - 1) / block_size;
std::vector<std::future<T>> futures(num_blocks - 1); thread_pool pool;
Iterator block_start = first; for (unsigned long i = 0; i < (num_blocks - 1); ++i) { Iterator block_end = block_start; std::advance(block_end, block_size); futures[i] = pool.submit([=] { accumulate_block<Iterator, T>()(block_start, block_end); }); block_start = block_end; } T last_result = accumulate_block<Iterator, T>()(block_start, last); T result = init; for (unsigned long i = 0; i < (num_blocks - 1); ++i) { result += futures[i].get(); } result += last_result; return result; }
|
工作量是依据使用的块数(num_blocks),而不是线程的数量。为了利用线程池的最大化可扩展性,需要将工作块划分为最小工作块。当线程池中线程不多时,每个线程将会处理多个工作块,不过随着硬件可用线程数量的增长,会有越来越多的工作块并发执行。这个线程池还不错,因为任务都是相互独立的。不过,当任务队列中的任务有依赖关系时,就会遇到麻烦了。
等待依赖任务
以快速排序算法为例:数据与中轴数据项比较,在中轴项两侧分为大于和小于的两个序列,然后再对这两组序列进行排序。这两组序列会递归排序,最后会整合成一个全排序序列。要将这个算法写成并发模式,需要保证递归调用能够使用硬件的并发能力。
在第 8 章中,使用一个固定线程数量(根据硬件可用并发线程数)的结构体。这样的情况下,使用栈来挂起需要排序的数据块。每个线程在数据块排序前,会向数据栈上添加一组要排序的数据,然后对当前数据块排序结束后,接着对另一块进行排序。这会消耗有限的线程,所以等待其他线程完成排序可能会造成死锁。一种情况很可能会出现:所有线程都在等某一个数据块进行排序,不过没有线程在做这块数据的排序。可以通过用主线程对该数据块进行排序来解决这个问题。
最简单的方法就是在 thread_pool 中添加一个新函数,来执行任务队列上的任务,并对线程池进行管理。
1 2 3 4 5 6 7 8
| void thread_pool::run_pending_task() { function_wrapper task; if (work_queue.try_pop(task)) { task(); } else { std::this_thread::yield(); } }
|
run_pending_task() 的实现去掉了在 worker_thread() 函数的主循环。函数任务队列中有任务的时候执行任务,没有的话就会让操作系统对线程进行重新分配。下面快速排序算法的实现要比之前简单许多,因为所有线程管理逻辑都移到线程池中了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| template <typename T> struct sorter { thread_pool pool;
std::list<T> do_sort(std::list<T>& chunk_data) { if (chunk_data.empty()) { return chunk_data; }
std::list<T> result; result.splice(result.begin(), chunk_data, chunk_data.begin()); T const& partition_val = *result.begin();
typename std::list<T>::iterator divide_point = std::partition(chunk_data.begin(), chunk_data.end(), [&](T const& val) { return val < partition_val; });
std::list<T> new_lower_chunk; new_lower_chunk.splice(new_lower_chunk.end(), chunk_data, chunk_data.begin(), divide_point);
std::future<std::list<T> > new_lower = pool.submit( std::bind(&sorter::do_sort, this, std::move(new_lower_chunk)));
std::list<T> new_higher(do_sort(chunk_data));
result.splice(result.end(), new_higher); while (!new_lower.wait_for(std::chrono::seconds(0)) == std::future_status::timeout) { pool.run_pending_task(); }
result.splice(result.begin(), new_lower.get()); return result; } };
template <typename T> std::list<T> parallel_quick_sort(std::list<T> input) { if (input.empty()) { return input; } sorter<T> s;
return s.do_sort(input); }
|
避免队列中的任务竞争
线程每次调用线程池的 submit(),都会推送一个任务到工作队列中。就像工作线程为了执行任务,从任务队列中获取任务一样。随着处理器的增加,任务队列上就会有很多的竞争,会让性能下降。使用无锁队列会让任务没有明显的等待,但乒乓缓存会消耗大量的时间。为了避免乒乓缓存,每个线程建立独立的任务队列。这样,每个线程就会将新任务放在自己的任务队列上,并且当线程上的任务队列没有任务时,去全局的任务列表中取任务。下面的实现使用了一个 thread_local 变量,来保证每个线程都拥有自己的任务列表。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| class thread_pool { thread_safe_queue<function_wrapper> pool_work_queue;
typedef std::queue<function_wrapper> local_queue_type; static thread_local std::unique_ptr<local_queue_type> local_work_queue;
void worker_thread() { local_work_queue.reset(new local_queue_type); while (!done) { run_pending_task(); } }
public: template <typename FunctionType> std::future<typename std::result_of<FunctionType()>::type> submit(FunctionType f) { typedef typename std::result_of<FunctionType()>::type result_type;
std::packaged_task<result_type()> task(f); std::future<result_type> res(task.get_future()); if (local_work_queue) { local_work_queue->push(std::move(task)); } else { pool_work_queue.push(std::move(task)); } return res; }
void run_pending_task() { function_wrapper task; if (local_work_queue && !local_work_queue->empty()) { task = std::move(local_work_queue->front()); local_work_queue->pop(); task(); } else if (pool_work_queue.try_pop(task)) { task(); } else { std::this_thread::yield(); } } };
|
这样就能有效的避免竞争,不过当任务分配不均时,造成的结果就是:某个线程本地队列中有很多任务的同时,其他线程无所事事。幸好这个问题有解:本地工作队列和全局工作队列上没有任务时,可从别的线程队列中窃取任务。
窃取任务
为了让没有任务的线程从其他线程的任务队列中获取任务,就需要本地任务列表可以被其他线程访问。这需要每个线程在线程池队列上进行注册,或由线程池指定一个线程。同样,还需要保证数据队列中的任务适当的被同步和保护,这样队列的不变量就不会被破坏。
实现一个无锁队列,让其线程在其他线程上窃取任务时,有推送和弹出一个任务的可能。不过,这个队列的实现超出了本书的讨论范围。为了证明这种方法的可行性,将使用一个互斥量来保护队列中的数据。我们希望任务窃取是不常见的现象,这样就会减少对互斥量的竞争,并且使得简单队列的开销最小。下面,实现了一个简单的基于锁的任务窃取队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| class work_stealing_queue { private: typedef function_wrapper data_type; std::deque<data_type> the_queue; mutable std::mutex the_mutex;
public: work_stealing_queue() {}
work_stealing_queue(const work_stealing_queue& other) = delete; work_stealing_queue& operator=(const work_stealing_queue& other) = delete;
void push(data_type data) { std::lock_guard<std::mutex> lock(the_mutex); the_queue.push_front(std::move(data)); }
bool empty() const { std::lock_guard<std::mutex> lock(the_mutex); return the_queue.empty(); }
bool try_pop(data_type& res) { std::lock_guard<std::mutex> lock(the_mutex); if (the_queue.empty()) { return false; }
res = std::move(the_queue.front()); the_queue.pop_front(); return true; } bool try_steal(data_type& res) { std::lock_guard<std::mutex> lock(the_mutex); if (the_queue.empty()) { return false; }
res = std::move(the_queue.back()); the_queue.pop_back(); return true; } };
|
每个线程中的“队列”是一个后进先出的栈,最新推入的任务将会第一个执行。从缓存角度来看,这将对性能有所提升,因为任务相关的数据一直存于缓存中,要比提前将任务相关数据推送到栈上好。try_steal() 从队列末尾获取任务,为了减少与 try_pop() 之间的竞争。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
| class thread_pool { typedef function_wrapper task_type;
std::atomic_bool done; thread_safe_queue<task_type> pool_work_queue; std::vector<std::unique_ptr<work_stealing_queue>> queues; std::vector<std::thread> threads; join_threads joiner;
static thread_local work_stealing_queue* local_work_queue; static thread_local unsigned my_index;
void worker_thread(unsigned my_index_) { my_index = my_index_; local_work_queue = queues[my_index].get(); while (!done) { run_pending_task(); } }
bool pop_task_from_local_queue(task_type& task) { return local_work_queue && local_work_queue->try_pop(task); }
bool pop_task_from_pool_queue(task_type& task) { return pool_work_queue.try_pop(task); }
bool pop_task_from_other_thread_queue(task_type& task) { for (unsigned i = 0; i < queues.size(); ++i) { unsigned const index = (my_index + i + 1) % queues.size(); if (queues[index]->try_steal(task)) { return true; } } return false; }
public: thread_pool() : done(false), joiner(threads) { unsigned const thread_count = std::thread::hardware_concurrency();
try { for (unsigned i = 0; i < thread_count; ++i) { queues.push_back(std::unique_ptr<work_stealing_queue>(new work_stealing_queue)); threads.push_back(std::thread(&thread_pool::worker_thread, this, i)); } } catch (...) { done = true; throw; } }
~thread_pool() { done = true; }
template <typename FunctionType> std::future<typename std::result_of<FunctionType()>::type> submit( FunctionType f) { typedef typename std::result_of<FunctionType()>::type result_type; std::packaged_task<result_type()> task(f); std::future<result_type> res(task.get_future()); if (local_work_queue) { local_work_queue->push(std::move(task)); } else { pool_work_queue.push(std::move(task)); } return res; }
void run_pending_task() { task_type task; if (pop_task_from_local_queue(task) || pop_task_from_pool_queue(task) || pop_task_from_other_thread_queue(task)) { task(); } else { std::this_thread::yield(); } } };
|
中断线程
很多情况下,使用信号来终止长时间运行的线程是合理的。这种线程的存在,可能是因为工作线程所在的线程池销毁,或是用户显式的取消了这个任务。不管是什么原因,原理都一样:需要使用信号来让未结束线程停止运行。这需要一种合适的方式让线程主动的停下来,而非戛然而止。
可能会给每种情况制定一个机制,但这样的意义不大。不仅因为用统一的机制会更容易在之后的场景中实现,而且写出来的中断代码不用担心在哪里使用。
启动和中断线程
可中断线程的外部接口如下:
1 2 3 4 5 6 7 8 9
| class interruptible_thread { public: template<typename FunctionType> interruptible_thread(FunctionType f); void join(); void detach(); bool joinable() const; void interrupt(); };
|
类内部可以使用 std::thread 来管理线程,并且使用一些自定义数据结构来处理中断。在不添加多余的数据的前提下,为了使断点能够正常使用,就需要使用一个没有参数的函数:interruption_point()。线程的使用者可以通过调用这个函数来在断点处终端线程。
thread_local 标志是不能使用普通的 std::thread 管理线程的主要原因,需要使用一种方法分配出一个可访问的 interruptible_thread 实例,就像新启动一个线程一样。使用已提供函数来做这件事情前,需要将 interruptible_thread 实例传递给 std::thread 的构造函数,创建一个能够执行的线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| class interrupt_flag { public: void set(); bool is_set() const; };
thread_local interrupt_flag this_thread_interrupt_flag;
class interruptible_thread { std::thread internal_thread; interrupt_flag* flag;
public: template <typename FunctionType> interruptible_thread(FunctionType f) { std::promise<interrupt_flag*> p; internal_thread = std::thread([f, &p] { p.set_value(&this_thread_interrupt_flag); f(); }); flag = p.get_future().get(); } void interrupt() { if (flag) { flag->set(); } } };
|
检查线程是否中断
1 2 3 4 5 6 7 8 9 10 11 12 13
| void interruption_point() { if (this_thread_interrupt_flag.is_set()) { throw thread_interrupted(); } }
void foo() { while (!done) { interruption_point(); process_next_item(); } }
|
当可中断线程外部调用 interrupt 函数时,this_thread_interrupt_flag 会被 set,函数将在 interruption_point() 处中断。尽管这可以工作,但不是理想的,更好的方式是用 std::condition_variable 来唤醒,而非在循环中持续运行。
使用条件变量中断等待
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| class interrupt_flag { std::atomic<bool> flag; std::condition_variable* thread_cond; std::mutex m;
public: interrupt_flag() : thread_cond(nullptr) {}
void set() { flag.store(true, std::memory_order_relaxed); std::lock_guard<std::mutex> l(m); if (thread_cond) thread_cond->notify_all(); }
bool is_set() const { return flag.load(std::memory_order_relaxed); }
void set_condition_variable(std::condition_variable& cv) { std::lock_guard<std::mutex> l(m); thread_cond = &cv; }
void clear_condition_variable() { std::lock_guard<std::mutex> l(m); thread_cond = nullptr; }
struct clear_cv_on_destruct { ~clear_cv_on_destruct() { this_thread_interrupt_flag.clear_condition_variable(); } }; };
template <typename Predicate> void interruptible_wait(std::condition_variable& cv, std::unique_lock<std::mutex>& lk, Predicate pred) { interruption_point(); this_thread_interrupt_flag.set_condition_variable(cv); interrupt_flag::clear_cv_on_destruct guard; while (!this_thread_interrupt_flag.is_set() && !pred()) { cv.wait_for(lk, std::chrono::milliseconds(1)); } interruption_point(); }
|
退出时中断后台任务
假如有一个桌面搜索程序,除了与用户交互,程序还需要监控文件系统的状态,以识别任何更改并更新其索引。为了避免影响 GUI 的响应性,这个处理通常会交给一个后台线程,后台线程需要运行于程序的整个生命周期。这样的程序通常只在机器关闭时退出,而在其他情况下关闭程序,就需要井然有序地关闭后台线程,一个关闭方式就是中断。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| std::mutex config_mutex; std::vector<interruptible_thread> background_threads;
void background_thread(int disk_id) { while (true) { interruption_point(); fs_change fsc = get_fs_changes(disk_id); if (fsc.has_changes()) { update_index(fsc); } } }
void start_background_processing() { background_threads.push_back(interruptible_thread(background_thread, disk_1)); background_threads.push_back(interruptible_thread(background_thread, disk_2)); }
int main() { start_background_processing(); process_gui_until_exit(); std::unique_lock<std::mutex> lk(config_mutex); for (unsigned i = 0; i < background_threads.size(); ++i) { background_threads[i].interrupt(); } for (unsigned i = 0; i < background_threads.size(); ++i) { background_threads[i].join(); } }
|
不直接在一个循环里中断并 join 的目的是为了可以并发中断多个线程。中断不会立即完成,它们退出前必要地调用析构和异常处理的代码。如果对每个线程都中断后立即 join,就会造成中断线程的等待。