09 高级线程管理 | 《C++ Concurrency In Action》笔记

本系列笔记参考《C++ Concurrency in Action, 2nd Edition》及其中文翻译

线程池

简单的线程池

作为简单的线程池,其拥有固定数量的工作线程(通常工作线程数量与 std::thread::hardware_concurrency()相同)。工作需要完成时,可以调用函数将任务挂在任务队列中。每个工作线程都会从任务队列上获取任务,然后执行这个任务,执行完成后再回来获取新的任务。线程池中线程就不需要等待其他线程完成对应任务了。如果需要等待,就需要对同步进行管理。

下面的代码展示了一个最简单的线程池实现,注意成员声明的顺序很重要:,。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class thread_pool {
// done 标志和 worker_queue 必须在 threads 数组之前声明,而 threads 必须在 joiner 前声明
// 这样才能确保成员以正确的顺序销毁
std::atomic_bool done;
thread_safe_queue<std::function<void()> > work_queue;
std::vector<std::thread> threads;
join_threads joiner;

void worker_thread() {
while (!done) {
// 从任务队列上获取任务,以及同时执行这些任务,执行一个循环直到设置 done 标志
std::function<void()> task;
if (work_queue.try_pop(task)) {
task();
} else {
// 如果任务队列上没有任务,函数会调用 std::this_thread::yield() 让线程休息,并且给予其他线程向任务队列推送任务的机会
std::this_thread::yield();
}
}
}

public:
thread_pool() : done(false), joiner(threads) {
unsigned const thread_count = std::thread::hardware_concurrency();

try {
for (unsigned i = 0; i < thread_count; ++i) {
threads.push_back(std::thread(&thread_pool::worker_thread, this));
}
} catch (...) {
done = true;
throw;
}
}

~thread_pool() {
done = true;
}

template <typename FunctionType>
void submit(FunctionType f) {
work_queue.push(std::function<void()>(f));
}
};

等待线程池中的任务

一种特殊的情况是,执行任务的线程需要返回结果到主线程上进行处理。本这种情况下,需要用 future 对最终的结果进行转移。下面的代码展示了对简单线程池的修改,通过修改就能等待任务完成,以及在工作线程完成后,返回一个结果到等待线程中去,不过 std::packaged_task<> 实例是不可拷贝的,仅可移动,所以不能再使用 std::function<> 来实现任务队列,因为 std::function<> 需要存储可复制构造的函数对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class function_wrapper {
struct impl_base {
virtual void call() = 0;
virtual ~impl_base() {}
};

std::unique_ptr<impl_base> impl;
template <typename F>
struct impl_type : impl_base {
F f;
impl_type(F&& f_) : f(std::move(f_)) {}
void call() { f(); }
};

public:
template <typename F>
function_wrapper(F&& f) : impl(new impl_type<F>(std::move(f))) {}

void operator()() { impl->call(); }

function_wrapper() = default;

function_wrapper(function_wrapper&& other) : impl(std::move(other.impl)) {}

function_wrapper& operator=(function_wrapper&& other) {
impl = std::move(other.impl);
return *this;
}

function_wrapper(const function_wrapper&) = delete;
function_wrapper(function_wrapper&) = delete;
function_wrapper& operator=(const function_wrapper&) = delete;
};

class thread_pool {
// 使用 function_wrapper,而非使用 std::function
thread_safe_queue<function_wrapper> work_queue;

void worker_thread() {
while (!done) {
function_wrapper task;
if (work_queue.try_pop(task)) {
task();
} else {
std::this_thread::yield();
}
}
}

public:
using result_type = std::result_of<FunctionType()>::type

template <typename FunctionType>
// submit() 函数,返回 std::future<> 保存任务的返回值,并且允许调用者等待任务完全结束
std::future<result_type> submit(FunctionType f) {
std::packaged_task<result_type()> task(std::move(f));
std::future<result_type> res(task.get_future());
work_queue.push(std::move(task));
return res;
}
};

下面的代码展示了如何让 parallel_accumuate 函数使用线程池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
template <typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init) {
unsigned long const length = std::distance(first, last);

if (!length) {
return init;
}

unsigned long const block_size = 25;
unsigned long const num_blocks = (length + block_size - 1) / block_size;

std::vector<std::future<T>> futures(num_blocks - 1);
thread_pool pool;

Iterator block_start = first;
for (unsigned long i = 0; i < (num_blocks - 1); ++i) {
Iterator block_end = block_start;
std::advance(block_end, block_size);
futures[i] = pool.submit([=] { accumulate_block<Iterator, T>()(block_start, block_end); });
block_start = block_end;
}
T last_result = accumulate_block<Iterator, T>()(block_start, last);
T result = init;
for (unsigned long i = 0; i < (num_blocks - 1); ++i) {
result += futures[i].get();
}
result += last_result;
return result;
}

工作量是依据使用的块数(num_blocks),而不是线程的数量。为了利用线程池的最大化可扩展性,需要将工作块划分为最小工作块。当线程池中线程不多时,每个线程将会处理多个工作块,不过随着硬件可用线程数量的增长,会有越来越多的工作块并发执行。这个线程池还不错,因为任务都是相互独立的。不过,当任务队列中的任务有依赖关系时,就会遇到麻烦了。

等待依赖任务

以快速排序算法为例:数据与中轴数据项比较,在中轴项两侧分为大于和小于的两个序列,然后再对这两组序列进行排序。这两组序列会递归排序,最后会整合成一个全排序序列。要将这个算法写成并发模式,需要保证递归调用能够使用硬件的并发能力。

在第 8 章中,使用一个固定线程数量(根据硬件可用并发线程数)的结构体。这样的情况下,使用栈来挂起需要排序的数据块。每个线程在数据块排序前,会向数据栈上添加一组要排序的数据,然后对当前数据块排序结束后,接着对另一块进行排序。这会消耗有限的线程,所以等待其他线程完成排序可能会造成死锁。一种情况很可能会出现:所有线程都在等某一个数据块进行排序,不过没有线程在做这块数据的排序。可以通过用主线程对该数据块进行排序来解决这个问题。

最简单的方法就是在 thread_pool 中添加一个新函数,来执行任务队列上的任务,并对线程池进行管理。

1
2
3
4
5
6
7
8
void thread_pool::run_pending_task() {
function_wrapper task;
if (work_queue.try_pop(task)) {
task();
} else {
std::this_thread::yield();
}
}

run_pending_task() 的实现去掉了在 worker_thread() 函数的主循环。函数任务队列中有任务的时候执行任务,没有的话就会让操作系统对线程进行重新分配。下面快速排序算法的实现要比之前简单许多,因为所有线程管理逻辑都移到线程池中了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
template <typename T>
struct sorter {
thread_pool pool;

std::list<T> do_sort(std::list<T>& chunk_data) {
if (chunk_data.empty()) {
return chunk_data;
}

std::list<T> result;
result.splice(result.begin(), chunk_data, chunk_data.begin());
T const& partition_val = *result.begin();

typename std::list<T>::iterator divide_point =
std::partition(chunk_data.begin(), chunk_data.end(),
[&](T const& val) { return val < partition_val; });

std::list<T> new_lower_chunk;
new_lower_chunk.splice(new_lower_chunk.end(), chunk_data,
chunk_data.begin(), divide_point);

std::future<std::list<T> > new_lower = pool.submit(
std::bind(&sorter::do_sort, this, std::move(new_lower_chunk)));

std::list<T> new_higher(do_sort(chunk_data));

result.splice(result.end(), new_higher);
while (!new_lower.wait_for(std::chrono::seconds(0)) ==
std::future_status::timeout) {
// 线程和任务管理在线程等待的时候,就会执行任务队列上未完成的任务
pool.run_pending_task();
}

result.splice(result.begin(), new_lower.get());
return result;
}
};

template <typename T>
std::list<T> parallel_quick_sort(std::list<T> input) {
if (input.empty()) {
return input;
}
sorter<T> s;

return s.do_sort(input);
}

避免队列中的任务竞争

线程每次调用线程池的 submit(),都会推送一个任务到工作队列中。就像工作线程为了执行任务,从任务队列中获取任务一样。随着处理器的增加,任务队列上就会有很多的竞争,会让性能下降。使用无锁队列会让任务没有明显的等待,但乒乓缓存会消耗大量的时间。为了避免乒乓缓存,每个线程建立独立的任务队列。这样,每个线程就会将新任务放在自己的任务队列上,并且当线程上的任务队列没有任务时,去全局的任务列表中取任务。下面的实现使用了一个 thread_local 变量,来保证每个线程都拥有自己的任务列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class thread_pool {
thread_safe_queue<function_wrapper> pool_work_queue;

typedef std::queue<function_wrapper> local_queue_type;
static thread_local std::unique_ptr<local_queue_type> local_work_queue;

void worker_thread() {
local_work_queue.reset(new local_queue_type);
while (!done) {
run_pending_task();
}
}

public:
template <typename FunctionType>
std::future<typename std::result_of<FunctionType()>::type> submit(FunctionType f) {
typedef typename std::result_of<FunctionType()>::type result_type;

std::packaged_task<result_type()> task(f);
std::future<result_type> res(task.get_future());
// 检查当前线程是否具有一个工作队列
if (local_work_queue) {
// 将任务放入线程的本地队列中
local_work_queue->push(std::move(task));
} else {
// 将这个任务放在线程池中的全局队列中
pool_work_queue.push(std::move(task));
}
return res;
}

void run_pending_task() {
function_wrapper task;
if (local_work_queue && !local_work_queue->empty()) {
task = std::move(local_work_queue->front());
local_work_queue->pop();
task();
} else if (pool_work_queue.try_pop(task)) {
// 如果本地线程上没有任务,就会从全局工作列表上获取任务
task();
} else {
std::this_thread::yield();
}
}
};

这样就能有效的避免竞争,不过当任务分配不均时,造成的结果就是:某个线程本地队列中有很多任务的同时,其他线程无所事事。幸好这个问题有解:本地工作队列和全局工作队列上没有任务时,可从别的线程队列中窃取任务。

窃取任务

为了让没有任务的线程从其他线程的任务队列中获取任务,就需要本地任务列表可以被其他线程访问。这需要每个线程在线程池队列上进行注册,或由线程池指定一个线程。同样,还需要保证数据队列中的任务适当的被同步和保护,这样队列的不变量就不会被破坏。

实现一个无锁队列,让其线程在其他线程上窃取任务时,有推送和弹出一个任务的可能。不过,这个队列的实现超出了本书的讨论范围。为了证明这种方法的可行性,将使用一个互斥量来保护队列中的数据。我们希望任务窃取是不常见的现象,这样就会减少对互斥量的竞争,并且使得简单队列的开销最小。下面,实现了一个简单的基于锁的任务窃取队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class work_stealing_queue {
private:
typedef function_wrapper data_type;
// 对 std::deque<fuction_wrapper> 进行了简单的包装,能通过一个互斥锁来对所有访问进行控制
std::deque<data_type> the_queue;
mutable std::mutex the_mutex;

public:
work_stealing_queue() {}

work_stealing_queue(const work_stealing_queue& other) = delete;
work_stealing_queue& operator=(const work_stealing_queue& other) = delete;

void push(data_type data) { // 2
std::lock_guard<std::mutex> lock(the_mutex);
the_queue.push_front(std::move(data));
}

bool empty() const {
std::lock_guard<std::mutex> lock(the_mutex);
return the_queue.empty();
}

bool try_pop(data_type& res) { // 3
std::lock_guard<std::mutex> lock(the_mutex);
if (the_queue.empty()) {
return false;
}

res = std::move(the_queue.front());
the_queue.pop_front();
return true;
}

// 对队列的后端进行操作
bool try_steal(data_type& res) {
std::lock_guard<std::mutex> lock(the_mutex);
if (the_queue.empty()) {
return false;
}

res = std::move(the_queue.back());
the_queue.pop_back();
return true;
}
};

每个线程中的“队列”是一个后进先出的栈,最新推入的任务将会第一个执行。从缓存角度来看,这将对性能有所提升,因为任务相关的数据一直存于缓存中,要比提前将任务相关数据推送到栈上好。try_steal() 从队列末尾获取任务,为了减少与 try_pop() 之间的竞争。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
class thread_pool {
typedef function_wrapper task_type;

std::atomic_bool done;
thread_safe_queue<task_type> pool_work_queue;
// 每个线程自己的工作队列将存储在线程池的全局工作队列中
std::vector<std::unique_ptr<work_stealing_queue>> queues;
std::vector<std::thread> threads;
join_threads joiner;

// 每个线程有一个属于自己的工作队列
static thread_local work_stealing_queue* local_work_queue;
static thread_local unsigned my_index;

void worker_thread(unsigned my_index_) {
my_index = my_index_;
// 列表中队列的序号,会传递给线程函数,然后使用序号来索引队列
local_work_queue = queues[my_index].get();
while (!done) {
run_pending_task();
}
}

bool pop_task_from_local_queue(task_type& task) {
return local_work_queue && local_work_queue->try_pop(task);
}

bool pop_task_from_pool_queue(task_type& task) {
return pool_work_queue.try_pop(task);
}

bool pop_task_from_other_thread_queue(task_type& task) { // 遍历池中所有线程的任务队列,然后尝试窃取任务
for (unsigned i = 0; i < queues.size(); ++i) {
// 为了避免每个线程都尝试从列表中的第一个线程上窃取任务,每一个线程都会从下一个线程开始遍历,通过自身的线程序号来确定开始遍历的线程序号
unsigned const index = (my_index + i + 1) % queues.size();
if (queues[index]->try_steal(task)) {
return true;
}
}
return false;
}

public:
thread_pool() : done(false), joiner(threads) {
unsigned const thread_count = std::thread::hardware_concurrency();

try {
for (unsigned i = 0; i < thread_count; ++i) {
queues.push_back(std::unique_ptr<work_stealing_queue>(new work_stealing_queue));
threads.push_back(std::thread(&thread_pool::worker_thread, this, i));
}
} catch (...) {
done = true;
throw;
}
}

~thread_pool() { done = true; }

template <typename FunctionType>
std::future<typename std::result_of<FunctionType()>::type> submit(
FunctionType f) {
typedef typename std::result_of<FunctionType()>::type result_type;
std::packaged_task<result_type()> task(f);
std::future<result_type> res(task.get_future());
if (local_work_queue) {
local_work_queue->push(std::move(task));
} else {
pool_work_queue.push(std::move(task));
}
return res;
}

void run_pending_task() {
task_type task;
if (pop_task_from_local_queue(task) || // 从线程的任务队列中取出一个任务来执行
pop_task_from_pool_queue(task) || // 或从线程池队列中获取一个任务
pop_task_from_other_thread_queue(task)) // 或从其他线程的队列中获取一个任务
{
task();
} else {
std::this_thread::yield();
}
}
};

中断线程

很多情况下,使用信号来终止长时间运行的线程是合理的。这种线程的存在,可能是因为工作线程所在的线程池销毁,或是用户显式的取消了这个任务。不管是什么原因,原理都一样:需要使用信号来让未结束线程停止运行。这需要一种合适的方式让线程主动的停下来,而非戛然而止。

可能会给每种情况制定一个机制,但这样的意义不大。不仅因为用统一的机制会更容易在之后的场景中实现,而且写出来的中断代码不用担心在哪里使用。

启动和中断线程

可中断线程的外部接口如下:

1
2
3
4
5
6
7
8
9
class interruptible_thread {
public:
template<typename FunctionType>
interruptible_thread(FunctionType f);
void join();
void detach();
bool joinable() const;
void interrupt();
};

类内部可以使用 std::thread 来管理线程,并且使用一些自定义数据结构来处理中断。在不添加多余的数据的前提下,为了使断点能够正常使用,就需要使用一个没有参数的函数:interruption_point()。线程的使用者可以通过调用这个函数来在断点处终端线程。

thread_local 标志是不能使用普通的 std::thread 管理线程的主要原因,需要使用一种方法分配出一个可访问的 interruptible_thread 实例,就像新启动一个线程一样。使用已提供函数来做这件事情前,需要将 interruptible_thread 实例传递给 std::thread 的构造函数,创建一个能够执行的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class interrupt_flag {
public:
void set();
bool is_set() const;
};

thread_local interrupt_flag this_thread_interrupt_flag;

class interruptible_thread {
std::thread internal_thread;
interrupt_flag* flag;

public:
template <typename FunctionType>
interruptible_thread(FunctionType f) {
std::promise<interrupt_flag*> p;
internal_thread = std::thread([f, &p] {
p.set_value(&this_thread_interrupt_flag);
f();
});
// 调用线程会等待 p 处于就绪态,并且将结果存入到 flag 中
flag = p.get_future().get();
}

// 线程外部调用,flag 会被 set,由于 flag 与 this_thread_interrupt_flag 的指针关联
// 所以这时 this_thread_interrupt_flag 也会被 set
void interrupt() {
if (flag) {
flag->set();
}
}
};

检查线程是否中断

1
2
3
4
5
6
7
8
9
10
11
12
13
void interruption_point() {
if (this_thread_interrupt_flag.is_set()) {
throw thread_interrupted();
}
}

// 将要在可中断线程中运行的函数
void foo() {
while (!done) {
interruption_point();
process_next_item();
}
}

当可中断线程外部调用 interrupt 函数时,this_thread_interrupt_flag 会被 set,函数将在 interruption_point() 处中断。尽管这可以工作,但不是理想的,更好的方式是用 std::condition_variable 来唤醒,而非在循环中持续运行。

使用条件变量中断等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class interrupt_flag {
std::atomic<bool> flag;
std::condition_variable* thread_cond;
std::mutex m;

public:
interrupt_flag() : thread_cond(nullptr) {}

void set() {
flag.store(true, std::memory_order_relaxed);
std::lock_guard<std::mutex> l(m);
if (thread_cond) thread_cond->notify_all();
}

bool is_set() const { return flag.load(std::memory_order_relaxed); }

void set_condition_variable(std::condition_variable& cv) {
std::lock_guard<std::mutex> l(m);
thread_cond = &cv;
}

void clear_condition_variable() {
std::lock_guard<std::mutex> l(m);
thread_cond = nullptr;
}

struct clear_cv_on_destruct {
~clear_cv_on_destruct() {
this_thread_interrupt_flag.clear_condition_variable();
}
};
};


// 与普通的 wait 不同,这个 wait 是可以被中断的
template <typename Predicate>
void interruptible_wait(std::condition_variable& cv, std::unique_lock<std::mutex>& lk, Predicate pred) {
interruption_point();
this_thread_interrupt_flag.set_condition_variable(cv);
interrupt_flag::clear_cv_on_destruct guard; // 下面的 wait_for 可能抛异常,所以需要 RAII 来清除标志
while (!this_thread_interrupt_flag.is_set() && !pred()) {
// 如果线程在等待期间因条件变量所中断,中断线程将广播条件变量,并唤醒等待该条件变量的线程检查中断(第二个检查中断)
cv.wait_for(lk, std::chrono::milliseconds(1));
}
interruption_point();
}

退出时中断后台任务

假如有一个桌面搜索程序,除了与用户交互,程序还需要监控文件系统的状态,以识别任何更改并更新其索引。为了避免影响 GUI 的响应性,这个处理通常会交给一个后台线程,后台线程需要运行于程序的整个生命周期。这样的程序通常只在机器关闭时退出,而在其他情况下关闭程序,就需要井然有序地关闭后台线程,一个关闭方式就是中断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
std::mutex config_mutex;
std::vector<interruptible_thread> background_threads;

void background_thread(int disk_id) {
while (true) {
interruption_point();
fs_change fsc = get_fs_changes(disk_id);
if (fsc.has_changes()) {
update_index(fsc);
}
}
}

void start_background_processing() {
background_threads.push_back(interruptible_thread(background_thread, disk_1));
background_threads.push_back(interruptible_thread(background_thread, disk_2));
}

int main() {
start_background_processing();
process_gui_until_exit();
std::unique_lock<std::mutex> lk(config_mutex);
for (unsigned i = 0; i < background_threads.size(); ++i) {
background_threads[i].interrupt();
}
// 中断所有线程后再 join
for (unsigned i = 0; i < background_threads.size(); ++i) {
background_threads[i].join();
}
}

不直接在一个循环里中断并 join 的目的是为了可以并发中断多个线程。中断不会立即完成,它们退出前必要地调用析构和异常处理的代码。如果对每个线程都中断后立即 join,就会造成中断线程的等待。

09 高级线程管理 | 《C++ Concurrency In Action》笔记

http://www.zh0ngtian.tech/posts/97b44340.html

作者

zhongtian

发布于

2021-07-10

更新于

2023-12-16

许可协议

评论