09 高级线程管理 | 《C++ Concurrency In Action》

本文最后更新于:2021年7月15日

线程池

简单的线程池

作为简单的线程池,其拥有固定数量的工作线程(通常工作线程数量与 std::thread::hardware_concurrency()相同)。工作需要完成时,可以调用函数将任务挂在任务队列中。每个工作线程都会从任务队列上获取任务,然后执行这个任务,执行完成后再回来获取新的任务。线程池中线程就不需要等待其他线程完成对应任务了。如果需要等待,就需要对同步进行管理。

下面的代码展示了一个最简单的线程池实现,注意成员声明的顺序很重要:,。

class thread_pool {
  // done 标志和 worker_queue 必须在 threads 数组之前声明,而 threads 必须在 joiner 前声明
  // 这样才能确保成员以正确的顺序销毁
  std::atomic_bool done;
  thread_safe_queue<std::function<void()> > work_queue;
  std::vector<std::thread> threads;
  join_threads joiner;

  void worker_thread() {
    while (!done) {
      // 从任务队列上获取任务,以及同时执行这些任务,执行一个循环直到设置 done 标志
      std::function<void()> task;
      if (work_queue.try_pop(task)) {
        task();
      } else {
        // 如果任务队列上没有任务,函数会调用 std::this_thread::yield() 让线程休息,并且给予其他线程向任务队列推送任务的机会
        std::this_thread::yield();
      }
    }
  }

 public:
  thread_pool() : done(false), joiner(threads) {
    unsigned const thread_count = std::thread::hardware_concurrency();

    try {
      for (unsigned i = 0; i < thread_count; ++i) {
        threads.push_back(std::thread(&thread_pool::worker_thread, this));
      }
    } catch (...) {
      done = true;
      throw;
    }
  }

  ~thread_pool() {
    done = true;
  }

  template <typename FunctionType>
  void submit(FunctionType f) {
    work_queue.push(std::function<void()>(f));
  }
};

等待线程池中的任务

一种特殊的情况是,执行任务的线程需要返回结果到主线程上进行处理。本这种情况下,需要用 future 对最终的结果进行转移。下面的代码展示了对简单线程池的修改,通过修改就能等待任务完成,以及在工作线程完成后,返回一个结果到等待线程中去,不过 std::packaged_task<> 实例是不可拷贝的,仅可移动,所以不能再使用 std::function<> 来实现任务队列,因为 std::function<> 需要存储可复制构造的函数对象。

class function_wrapper {
  struct impl_base {
    virtual void call() = 0;
    virtual ~impl_base() {}
  };

  std::unique_ptr<impl_base> impl;
  template <typename F>
  struct impl_type : impl_base {
    F f;
    impl_type(F&& f_) : f(std::move(f_)) {}
    void call() { f(); }
  };

 public:
  template <typename F>
  function_wrapper(F&& f) : impl(new impl_type<F>(std::move(f))) {}

  void operator()() { impl->call(); }

  function_wrapper() = default;

  function_wrapper(function_wrapper&& other) : impl(std::move(other.impl)) {}

  function_wrapper& operator=(function_wrapper&& other) {
    impl = std::move(other.impl);
    return *this;
  }

  function_wrapper(const function_wrapper&) = delete;
  function_wrapper(function_wrapper&) = delete;
  function_wrapper& operator=(const function_wrapper&) = delete;
};

class thread_pool {
  // 使用 function_wrapper,而非使用 std::function
  thread_safe_queue<function_wrapper> work_queue;

  void worker_thread() {
    while (!done) {
      function_wrapper task;
      if (work_queue.try_pop(task)) {
        task();
      } else {
        std::this_thread::yield();
      }
    }
  }

 public:
  using result_type = std::result_of<FunctionType()>::type
 
  template <typename FunctionType>
  // submit() 函数,返回 std::future<> 保存任务的返回值,并且允许调用者等待任务完全结束
  std::future<result_type> submit(FunctionType f) {
    std::packaged_task<result_type()> task(std::move(f));
    std::future<result_type> res(task.get_future());
    work_queue.push(std::move(task));
    return res;
  }
};

下面的代码展示了如何让 parallel_accumuate 函数使用线程池:

template <typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init) {
  unsigned long const length = std::distance(first, last);

  if (!length) {
    return init;
  }

  unsigned long const block_size = 25;
  unsigned long const num_blocks = (length + block_size - 1) / block_size;

  std::vector<std::future<T>> futures(num_blocks - 1);
  thread_pool pool;

  Iterator block_start = first;
  for (unsigned long i = 0; i < (num_blocks - 1); ++i) {
    Iterator block_end = block_start;
    std::advance(block_end, block_size);
    futures[i] = pool.submit([=] { accumulate_block<Iterator, T>()(block_start, block_end); });
    block_start = block_end;
  }
  T last_result = accumulate_block<Iterator, T>()(block_start, last);
  T result = init;
  for (unsigned long i = 0; i < (num_blocks - 1); ++i) {
    result += futures[i].get();
  }
  result += last_result;
  return result;
}

工作量是依据使用的块数(num_blocks),而不是线程的数量。为了利用线程池的最大化可扩展性,需要将工作块划分为最小工作块。当线程池中线程不多时,每个线程将会处理多个工作块,不过随着硬件可用线程数量的增长,会有越来越多的工作块并发执行。这个线程池还不错,因为任务都是相互独立的。不过,当任务队列中的任务有依赖关系时,就会遇到麻烦了。

等待依赖任务

以快速排序算法为例:数据与中轴数据项比较,在中轴项两侧分为大于和小于的两个序列,然后再对这两组序列进行排序。这两组序列会递归排序,最后会整合成一个全排序序列。要将这个算法写成并发模式,需要保证递归调用能够使用硬件的并发能力。

在第 8 章中,使用一个固定线程数量(根据硬件可用并发线程数)的结构体。这样的情况下,使用栈来挂起需要排序的数据块。每个线程在数据块排序前,会向数据栈上添加一组要排序的数据,然后对当前数据块排序结束后,接着对另一块进行排序。这会消耗有限的线程,所以等待其他线程完成排序可能会造成死锁。一种情况很可能会出现:所有线程都在等某一个数据块进行排序,不过没有线程在做这块数据的排序。可以通过用主线程对该数据块进行排序来解决这个问题。

最简单的方法就是在 thread_pool 中添加一个新函数,来执行任务队列上的任务,并对线程池进行管理。

void thread_pool::run_pending_task() {
  function_wrapper task;
  if (work_queue.try_pop(task)) {
    task();
  } else {
    std::this_thread::yield();
  }
}

run_pending_task() 的实现去掉了在 worker_thread() 函数的主循环。函数任务队列中有任务的时候执行任务,没有的话就会让操作系统对线程进行重新分配。下面快速排序算法的实现要比之前简单许多,因为所有线程管理逻辑都移到线程池中了。

template <typename T>
struct sorter {
  thread_pool pool;

  std::list<T> do_sort(std::list<T>& chunk_data) {
    if (chunk_data.empty()) {
      return chunk_data;
    }

    std::list<T> result;
    result.splice(result.begin(), chunk_data, chunk_data.begin());
    T const& partition_val = *result.begin();

    typename std::list<T>::iterator divide_point =
        std::partition(chunk_data.begin(), chunk_data.end(),
                       [&](T const& val) { return val < partition_val; });

    std::list<T> new_lower_chunk;
    new_lower_chunk.splice(new_lower_chunk.end(), chunk_data,
                           chunk_data.begin(), divide_point);

    std::future<std::list<T> > new_lower = pool.submit(
        std::bind(&sorter::do_sort, this, std::move(new_lower_chunk)));

    std::list<T> new_higher(do_sort(chunk_data));

    result.splice(result.end(), new_higher);
    while (!new_lower.wait_for(std::chrono::seconds(0)) ==
           std::future_status::timeout) {
      // 线程和任务管理在线程等待的时候,就会执行任务队列上未完成的任务
      pool.run_pending_task();
    }

    result.splice(result.begin(), new_lower.get());
    return result;
  }
};

template <typename T>
std::list<T> parallel_quick_sort(std::list<T> input) {
  if (input.empty()) {
    return input;
  }
  sorter<T> s;

  return s.do_sort(input);
}

避免队列中的任务竞争

线程每次调用线程池的 submit(),都会推送一个任务到工作队列中。就像工作线程为了执行任务,从任务队列中获取任务一样。随着处理器的增加,任务队列上就会有很多的竞争,会让性能下降。使用无锁队列会让任务没有明显的等待,但乒乓缓存会消耗大量的时间。为了避免乒乓缓存,每个线程建立独立的任务队列。这样,每个线程就会将新任务放在自己的任务队列上,并且当线程上的任务队列没有任务时,去全局的任务列表中取任务。下面的实现使用了一个 thread_local 变量,来保证每个线程都拥有自己的任务列表。

class thread_pool {
  thread_safe_queue<function_wrapper> pool_work_queue;

  typedef std::queue<function_wrapper> local_queue_type;
  static thread_local std::unique_ptr<local_queue_type> local_work_queue;

  void worker_thread() {
    local_work_queue.reset(new local_queue_type);
    while (!done) {
      run_pending_task();
    }
  }

 public:
  template <typename FunctionType>
  std::future<typename std::result_of<FunctionType()>::type> submit(FunctionType f) {
    typedef typename std::result_of<FunctionType()>::type result_type;

    std::packaged_task<result_type()> task(f);
    std::future<result_type> res(task.get_future());
    // 检查当前线程是否具有一个工作队列
    if (local_work_queue) {
      // 将任务放入线程的本地队列中
      local_work_queue->push(std::move(task));
    } else {
      // 将这个任务放在线程池中的全局队列中
      pool_work_queue.push(std::move(task));
    }
    return res;
  }

  void run_pending_task() {
    function_wrapper task;
    if (local_work_queue && !local_work_queue->empty()) {
      task = std::move(local_work_queue->front());
      local_work_queue->pop();
      task();
    } else if (pool_work_queue.try_pop(task)) {
      // 如果本地线程上没有任务,就会从全局工作列表上获取任务
      task();
    } else {
      std::this_thread::yield();
    }
  }
};

这样就能有效的避免竞争,不过当任务分配不均时,造成的结果就是:某个线程本地队列中有很多任务的同时,其他线程无所事事。幸好这个问题有解:本地工作队列和全局工作队列上没有任务时,可从别的线程队列中窃取任务。

窃取任务

为了让没有任务的线程从其他线程的任务队列中获取任务,就需要本地任务列表可以被其他线程访问。这需要每个线程在线程池队列上进行注册,或由线程池指定一个线程。同样,还需要保证数据队列中的任务适当的被同步和保护,这样队列的不变量就不会被破坏。

实现一个无锁队列,让其线程在其他线程上窃取任务时,有推送和弹出一个任务的可能。不过,这个队列的实现超出了本书的讨论范围。为了证明这种方法的可行性,将使用一个互斥量来保护队列中的数据。我们希望任务窃取是不常见的现象,这样就会减少对互斥量的竞争,并且使得简单队列的开销最小。下面,实现了一个简单的基于锁的任务窃取队列。

class work_stealing_queue {
 private:
  typedef function_wrapper data_type;
  // 对 std::deque<fuction_wrapper> 进行了简单的包装,能通过一个互斥锁来对所有访问进行控制
  std::deque<data_type> the_queue;
  mutable std::mutex the_mutex;

 public:
  work_stealing_queue() {}

  work_stealing_queue(const work_stealing_queue& other) = delete;
  work_stealing_queue& operator=(const work_stealing_queue& other) = delete;

  void push(data_type data) {  // 2
    std::lock_guard<std::mutex> lock(the_mutex);
    the_queue.push_front(std::move(data));
  }

  bool empty() const {
    std::lock_guard<std::mutex> lock(the_mutex);
    return the_queue.empty();
  }

  bool try_pop(data_type& res) {  // 3
    std::lock_guard<std::mutex> lock(the_mutex);
    if (the_queue.empty()) {
      return false;
    }

    res = std::move(the_queue.front());
    the_queue.pop_front();
    return true;
  }
  
  // 对队列的后端进行操作
  bool try_steal(data_type& res) {
    std::lock_guard<std::mutex> lock(the_mutex);
    if (the_queue.empty()) {
      return false;
    }

    res = std::move(the_queue.back());
    the_queue.pop_back();
    return true;
  }
};

每个线程中的“队列”是一个后进先出的栈,最新推入的任务将会第一个执行。从缓存角度来看,这将对性能有所提升,因为任务相关的数据一直存于缓存中,要比提前将任务相关数据推送到栈上好。try_steal() 从队列末尾获取任务,为了减少与 try_pop() 之间的竞争。

class thread_pool {
  typedef function_wrapper task_type;

  std::atomic_bool done;
  thread_safe_queue<task_type> pool_work_queue;
  // 每个线程自己的工作队列将存储在线程池的全局工作队列中
  std::vector<std::unique_ptr<work_stealing_queue>> queues;
  std::vector<std::thread> threads;
  join_threads joiner;

  // 每个线程有一个属于自己的工作队列
  static thread_local work_stealing_queue* local_work_queue;
  static thread_local unsigned my_index;

  void worker_thread(unsigned my_index_) {
    my_index = my_index_;
    // 列表中队列的序号,会传递给线程函数,然后使用序号来索引队列
    local_work_queue = queues[my_index].get();
    while (!done) {
      run_pending_task();
    }
  }

  bool pop_task_from_local_queue(task_type& task) {
    return local_work_queue && local_work_queue->try_pop(task);
  }

  bool pop_task_from_pool_queue(task_type& task) {
    return pool_work_queue.try_pop(task);
  }

  bool pop_task_from_other_thread_queue(task_type& task) {  // 遍历池中所有线程的任务队列,然后尝试窃取任务
    for (unsigned i = 0; i < queues.size(); ++i) {
      // 为了避免每个线程都尝试从列表中的第一个线程上窃取任务,每一个线程都会从下一个线程开始遍历,通过自身的线程序号来确定开始遍历的线程序号
      unsigned const index = (my_index + i + 1) % queues.size();
      if (queues[index]->try_steal(task)) {
        return true;
      }
    }
    return false;
  }

 public:
  thread_pool() : done(false), joiner(threads) {
    unsigned const thread_count = std::thread::hardware_concurrency();

    try {
      for (unsigned i = 0; i < thread_count; ++i) {
        queues.push_back(std::unique_ptr<work_stealing_queue>(new work_stealing_queue));
        threads.push_back(std::thread(&thread_pool::worker_thread, this, i));
      }
    } catch (...) {
      done = true;
      throw;
    }
  }

  ~thread_pool() { done = true; }

  template <typename FunctionType>
  std::future<typename std::result_of<FunctionType()>::type> submit(
      FunctionType f) {
    typedef typename std::result_of<FunctionType()>::type result_type;
    std::packaged_task<result_type()> task(f);
    std::future<result_type> res(task.get_future());
    if (local_work_queue) {
      local_work_queue->push(std::move(task));
    } else {
      pool_work_queue.push(std::move(task));
    }
    return res;
  }

  void run_pending_task() {
    task_type task;
    if (pop_task_from_local_queue(task) ||  // 从线程的任务队列中取出一个任务来执行
        pop_task_from_pool_queue(task) ||  // 或从线程池队列中获取一个任务
        pop_task_from_other_thread_queue(task))  // 或从其他线程的队列中获取一个任务
    {
      task();
    } else {
      std::this_thread::yield();
    }
  }
};

中断线程

很多情况下,使用信号来终止长时间运行的线程是合理的。这种线程的存在,可能是因为工作线程所在的线程池销毁,或是用户显式的取消了这个任务。不管是什么原因,原理都一样:需要使用信号来让未结束线程停止运行。这需要一种合适的方式让线程主动的停下来,而非戛然而止。

可能会给每种情况制定一个机制,但这样的意义不大。不仅因为用统一的机制会更容易在之后的场景中实现,而且写出来的中断代码不用担心在哪里使用。

启动和中断线程

可中断线程的外部接口如下:

class interruptible_thread {
public:
  template<typename FunctionType>
  interruptible_thread(FunctionType f);
  void join();
  void detach();
  bool joinable() const;
  void interrupt();
};

类内部可以使用 std::thread 来管理线程,并且使用一些自定义数据结构来处理中断。在不添加多余的数据的前提下,为了使断点能够正常使用,就需要使用一个没有参数的函数:interruption_point()。线程的使用者可以通过调用这个函数来在断点处终端线程。

thread_local 标志是不能使用普通的 std::thread 管理线程的主要原因,需要使用一种方法分配出一个可访问的 interruptible_thread 实例,就像新启动一个线程一样。使用已提供函数来做这件事情前,需要将 interruptible_thread 实例传递给 std::thread 的构造函数,创建一个能够执行的线程。

class interrupt_flag {
 public:
  void set();
  bool is_set() const;
};

thread_local interrupt_flag this_thread_interrupt_flag;

class interruptible_thread {
  std::thread internal_thread;
  interrupt_flag* flag;

 public:
  template <typename FunctionType>
  interruptible_thread(FunctionType f) {
    std::promise<interrupt_flag*> p;
    internal_thread = std::thread([f, &p] {
      p.set_value(&this_thread_interrupt_flag);
      f();
    });
    // 调用线程会等待 p 处于就绪态,并且将结果存入到 flag 中
    flag = p.get_future().get();
  }
  
  // 线程外部调用,flag 会被 set,由于 flag 与 this_thread_interrupt_flag 的指针关联
  // 所以这时 this_thread_interrupt_flag 也会被 set
  void interrupt() {
    if (flag) {
      flag->set();
    }
  }
};

检查线程是否中断

void interruption_point() {
  if (this_thread_interrupt_flag.is_set()) {
    throw thread_interrupted();
  }
}

// 将要在可中断线程中运行的函数
void foo() {
  while (!done) {
    interruption_point();
    process_next_item();
  }
}

当可中断线程外部调用 interrupt 函数时,this_thread_interrupt_flag 会被 set,函数将在 interruption_point() 处中断。尽管这可以工作,但不是理想的,更好的方式是用 std::condition_variable 来唤醒,而非在循环中持续运行。

使用条件变量中断等待

class interrupt_flag {
  std::atomic<bool> flag;
  std::condition_variable* thread_cond;
  std::mutex m;

 public:
  interrupt_flag() : thread_cond(nullptr) {}

  void set() {
    flag.store(true, std::memory_order_relaxed);
    std::lock_guard<std::mutex> l(m);
    if (thread_cond) thread_cond->notify_all();
  }

  bool is_set() const { return flag.load(std::memory_order_relaxed); }

  void set_condition_variable(std::condition_variable& cv) {
    std::lock_guard<std::mutex> l(m);
    thread_cond = &cv;
  }

  void clear_condition_variable() {
    std::lock_guard<std::mutex> l(m);
    thread_cond = nullptr;
  }

  struct clear_cv_on_destruct {
    ~clear_cv_on_destruct() {
      this_thread_interrupt_flag.clear_condition_variable();
    }
  };
};


// 与普通的 wait 不同,这个 wait 是可以被中断的
template <typename Predicate>
void interruptible_wait(std::condition_variable& cv, std::unique_lock<std::mutex>& lk, Predicate pred) {
  interruption_point();
  this_thread_interrupt_flag.set_condition_variable(cv);
  interrupt_flag::clear_cv_on_destruct guard;  // 下面的 wait_for 可能抛异常,所以需要 RAII 来清除标志
  while (!this_thread_interrupt_flag.is_set() && !pred()) {
    // 如果线程在等待期间因条件变量所中断,中断线程将广播条件变量,并唤醒等待该条件变量的线程检查中断(第二个检查中断)
    cv.wait_for(lk, std::chrono::milliseconds(1));
  }
  interruption_point();
}

退出时中断后台任务

假如有一个桌面搜索程序,除了与用户交互,程序还需要监控文件系统的状态,以识别任何更改并更新其索引。为了避免影响 GUI 的响应性,这个处理通常会交给一个后台线程,后台线程需要运行于程序的整个生命周期。这样的程序通常只在机器关闭时退出,而在其他情况下关闭程序,就需要井然有序地关闭后台线程,一个关闭方式就是中断。

std::mutex config_mutex;
std::vector<interruptible_thread> background_threads;

void background_thread(int disk_id) {
  while (true) {
    interruption_point();
    fs_change fsc = get_fs_changes(disk_id);
    if (fsc.has_changes()) {
      update_index(fsc);
    }
  }
}

void start_background_processing() {
  background_threads.push_back(interruptible_thread(background_thread, disk_1));
  background_threads.push_back(interruptible_thread(background_thread, disk_2));
}

int main() {
  start_background_processing();
  process_gui_until_exit();
  std::unique_lock<std::mutex> lk(config_mutex);
  for (unsigned i = 0; i < background_threads.size(); ++i) {
    background_threads[i].interrupt();
  }
  // 中断所有线程后再 join
  for (unsigned i = 0; i < background_threads.size(); ++i) {
    background_threads[i].join();
  }
}

不直接在一个循环里中断并 join 的目的是为了可以并发中断多个线程。中断不会立即完成,它们退出前必要地调用析构和异常处理的代码。如果对每个线程都中断后立即 join,就会造成中断线程的等待。