bthread | bRPC

本文介绍了 bthread 基本概念和原理。

概述

bthread 是一个 M:N 线程库,M:N 是指 M 个 bthread 会映射至 N 个pthread,一般 M 远大于 N。由于 linux 当下的 pthread 实现(NPTL)是 1:1 的,M 个 bthread 也相当于映射至 N 个 LWP。

之所以要采用这么一种 M:N 的机制,是为了兼顾当前多核 CPU 以及调度竞争上,考虑两种极端情况,一是每个用户线程对应一个内核线程,如果是这种模型,对多核 CPU 的利用会很充分,但是调度成本(用户态内核态的切换)以及线程间的数据同步成本都比较高,而是所有的用户线程都在一个内核线程的情况,这种情况下调度成本和数据同步成本低,但很难利用多核 CPU 的能力,同时用户线程也容易 block。

bthread 的主要思想就是,M 个 bthread 可以运行在 N 个内核线程上,也就是有 N 个 worker 分别运行在 N 个 pthread 上,所有的 bthread 都是在 worker 上调度运行,worker 运行完一个 bthread 后就会去队列里调度下一个 bthread。既可以从当前 worker 的本地队列里调度,也可以从其他 worker 的队列里取,即 “work stealing” 机制。

主要类

TaskGroup

  • TaskGroup 1:1 对应 pthread
  • TaskGroup 是 1:N 的 bthread 调度器,一个 TaskGroup 是一个 pthread,但是可以管理多个归属于这个 TaskGroup 的所有 bthread Task,
    一个 task_group 维护者一个 run_queue 和一个 remote_queue。Remote_queue 用于存放非 TaskControl 中线程创建的 bthread。

TaskControl

TaskControl 采用单例模式,对 TaskGroup 进行管理。TaskControl 主要负责如下几大块的内容:

  • 管理所有的 TaskGroup,比如 start、stop、add
  • 统计 TaskGroup 的数据
  • 在 TaskControl 管辖的 pthread 之外的 pthread 创建的 bthread,由 TaskControl 随机选一个 TaskGroup 进行 bthread 投递
  • signal_task,用于唤醒没有任务在等待的 TaskGroup 处理任务
  • steal_task,用于 TaskGroup 在自身队列中没有任务情况下进行抢占任务

原理

最常用的启动 bthread 函数就是 bthread_start_urgent 和 bthread_start_background。调用前者会让出当前 worker 立即执行新 bthread,当前 bthread 随后调度。调用后者则是将要启动的 bthread 放入队列让后台调度。

1
2
3
4
5
6
7
8
9
10
11
int bthread_start_urgent(bthread_t* __restrict tid,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
bthread::TaskGroup* g = bthread::tls_task_group;
if (g) {
// start from worker
return bthread::TaskGroup::start_foreground(&g, tid, attr, fn, arg);
}
return bthread::start_from_non_worker(tid, attr, fn, arg);
}

bthread_start_urgent 函数和 pthread_create 函数的接口类似。bthread_start_urgent 函数首先创建一个 TaskGroup 类型的 tls_task_group,然后使用 tls_task_group 执行任务。

tls_task_group 表明当前线程所归属的 TaskGroup,如果不为 null,则表明当前就是 bthread 而且 tls_task_group 指明了对应的 TaskGroup,在对应 TaskGroup(worker)执行新 bthread 的启动即可。TaskGroup::start_foreground 和 TaskGroup::start_background 的不同之处在于前者是直接通过代码 TaskGroup::sched_to(pg, m->tid); 调度执行,而后者则是通过 ready_to_run_remote 或者 ready_to_run 把任务加入队列然后调用 signal 按需唤醒 worker。

如果 tls_task_group 为 null,说明当前线程不是 bthread,此时会调用 start_from_non_worker 函数。该函数会调用 get_or_new_task_control 函数获取或者创建新的 TaskControl 单例。创建 TaskControl 会执行其 init 函数,核心就是用 pthread 启动指定数量的 worker,如下所示:

1
2
3
4
5
6
7
8
_workers.resize(_concurrency);   
for (int i = 0; i < _concurrency; ++i) {
const int rc = pthread_create(&_workers[i], NULL, worker_thread, this);
if (rc) {
LOG(ERROR) << "Fail to create _workers[" << i << "], " << berror(rc);
return -1;
}
}

上面代码中的 worker_thread 如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void* TaskControl::worker_thread(void* arg) {
run_worker_startfn();
#ifdef BAIDU_INTERNAL
logging::ComlogInitializer comlog_initializer;
#endif

TaskControl* c = static_cast<TaskControl*>(arg);
TaskGroup* g = c->create_group(); // 新建并初始化 task_group
TaskStatistics stat;
if (NULL == g) {
LOG(ERROR) << "Fail to create TaskGroup in pthread=" << pthread_self();
return NULL;
}
BT_VLOG << "Created worker=" << pthread_self()
<< " bthread=" << g->main_tid();

tls_task_group = g;
c->_nworkers << 1;
g->run_main_task(); // 主入口

stat = g->main_stat();
BT_VLOG << "Destroying worker=" << pthread_self() << " bthread="
<< g->main_tid() << " idle=" << stat.cputime_ns / 1000000.0
<< "ms uptime=" << g->current_uptime_ns() / 1000000.0 << "ms";
tls_task_group = NULL;
g->destroy_self();
c->_nworkers << -1;
return NULL;
}

在 worker_thread 中创建 TaskGroup 并加入到 TaskControl 的成员变量 _groups 中,如果创建成功则调用 TaskGroup 的 run_main_task 函数,TaskGroup 进入等待任务的循环中。

上述代码中 run_main_task 的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
void TaskGroup::run_main_task() {
bvar::PassiveStatus<double> cumulated_cputime(
get_cumulated_cputime_from_this, this);
std::unique_ptr<bvar::PerSecond<bvar::PassiveStatus<double> > > usage_bvar;

TaskGroup* dummy = this;
bthread_t tid;
while (wait_task(&tid)) {
TaskGroup::sched_to(&dummy, tid);
DCHECK_EQ(this, dummy);
DCHECK_EQ(_cur_meta->stack, _main_stack);
if (_cur_meta->tid != _main_tid) {
TaskGroup::task_runner(1/*skip remained*/);
}
if (FLAGS_show_per_worker_usage_in_vars && !usage_bvar) {
char name[32];
#if defined(OS_MACOSX)
snprintf(name, sizeof(name), "bthread_worker_usage_%" PRIu64,
pthread_numeric_id());
#else
snprintf(name, sizeof(name), "bthread_worker_usage_%ld",
(long)syscall(SYS_gettid));
#endif
usage_bvar.reset(new bvar::PerSecond<bvar::PassiveStatus<double> >
(name, &cumulated_cputime, 1));
}
}
// stop_main_task() was called.
// Don't forget to add elapse of last wait_task.
current_task()->stat.cputime_ns += butil::cpuwide_time_ns() - _last_run_ns;
}

worker_thread 函数先是调用 create_group 新建并初始化 task_group ,随后调用 run_main_task,不断循环等待可以执行的 bthread,包括去其他的 worker steal。一旦拿到了可以执行的任务,则调用 sched_to 进行执行,里面是一些 context 的切换后执行之类的底层操作。前面提到的在一个 bthread 里执行bthread_start_urgent,会调用 TaskGroup::start_foreground,里面也是调用 TaskGroup::sched_to 立即让出当前 worker。上面调用的 TaskGroup::sched_to 函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// include/bthread/task_group_inl.h
inline void TaskGroup::sched_to(TaskGroup** pg, bthread_t next_tid) {
TaskMeta* next_meta = address_meta(next_tid);
if (next_meta->stack == NULL) {
ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
if (stk) {
next_meta->set_stack(stk);
} else {
// stack_type is BTHREAD_STACKTYPE_PTHREAD or out of memory,
// In latter case, attr is forced to be BTHREAD_STACKTYPE_PTHREAD.
// This basically means that if we can't allocate stack, run
// the task in pthread directly.
next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD;
next_meta->set_stack((*pg)->_main_stack);
}
}
// Update now_ns only when wait_task did yield.
sched_to(pg, next_meta);
}

根据传入的 next_tid 取出对应 bthread 的 meta 信息,如果对应 meta 的 stack 为空,说明这是一个新建的 bthread,则会调用 get_stack 从一个 object pool 类型的资源池里取出 stack 对象赋给 bthread,object pool 继承自 resource pool。

确保 stack 就绪后再调用内部的另一个 TaskGroup::sched_to 的重载函数去执行切换操作。切换 threadlocal 变量的部分代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
if (cur_meta->stack != NULL) {
if (next_meta->stack != cur_meta->stack) {
jump_stack(cur_meta->stack, next_meta->stack);
// probably went to another group, need to assign g again.
g = tls_task_group;
}
#ifndef NDEBUG
else {
// else pthread_task is switching to another pthread_task, sc
// can only equal when they're both _main_stack
CHECK(cur_meta->stack == g->_main_stack);
}
#endif
}

inline void jump_stack(ContextualStack* from, ContextualStack* to) {
bthread_jump_fcontext(&from->context, to->context, 0/*not skip remained*/);
}

bthread_jump_fcontext 函数是由汇编实现的,通过操作寄存器完成线程切换。

调度

在 bthread 中,worker 是基于 ParkingLot 进行调度的。没有任务的时候在这里停车和唤醒。ParkingLot 本质上就是基于 futex 的 wait/signal,关于 futex 可以参考关于同步的一点思考-上

ParkingLot 有四个类函数 signal、wait、stop、get_state:

  • signal 是唤醒 num_task 个等待在 _pending_signal 上的线程
  • wait 是如果 _pending_signal 的当前值和先前拿到的 expected_state.val 相等的话就 wait
  • stop 则是将停止的标识位置为 1,然后唤醒所有 wait 的线程,这里的 stop 指的就是 wait 的 stop
  • get_state 是获取用于 wait 的状态,就是直接返回 _pending_signal 的值

TaskGroup 类里和 bthread 调度直接相关的两个主要函数为 sched 和 sched_to,前者是让出当前 tg 按照调度规则从队列里调度下一个 bthread,后者是让出当前 tg 直接调度指定 tg,sched 函数的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void TaskGroup::sched(TaskGroup** pg) {
TaskGroup* g = *pg;
bthread_t next_tid = 0;
// Find next task to run, if none, switch to idle thread of the group.
#ifndef BTHREAD_FAIR_WSQ
const bool popped = g->_rq.pop(&next_tid);
#else
const bool popped = g->_rq.steal(&next_tid);
#endif
if (!popped && !g->steal_task(&next_tid)) {
// Jump to main task if there's no task to run.
next_tid = g->_main_tid;
}
sched_to(pg, next_tid);
}

WorkStealingQueue 的 push 和 pop 都是在 bottom 一侧,而 steal 是在 top 一侧,如果没有 BTHREAD_FAIR_WSQ 宏定义,会使用 pop,否则是用steal,从 FIFO 的角度来说,使用 steal 是更公平的,但是开销会更大(为什么开销会更大?)。

TaskGroup 的 run_main_task 函数就用到了前面提到的 ParkingLot,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
bool TaskGroup::wait_task(bthread_t* tid) {
do {
#ifndef BTHREAD_DONT_SAVE_PARKING_STATE
if (_last_pl_state.stopped()) {
return false;
}
_pl->wait(_last_pl_state);
if (steal_task(tid)) {
return true;
}
#else
const ParkingLot::State st = _pl->get_state();
if (st.stopped()) {
return false;
}
if (steal_task(tid)) {
return true;
}
_pl->wait(st);
#endif
} while (true);
}

该函数不断循环判断 _pl 是否处于停止状态,如果是,则直接返回 -1,_pl 被调用 stop 后会进入停止状态,正常运行过程中 stop 不会被调用。函数还会循环尝试 steal_task,如果取到了 task 则返回 true,没取到则根据上次的状态进行 wait,因为是在循环里,根据 futex 的机制,如果上一个 state 和当前 _pl 上的state 一致,那么说明 _pl 上的任务没变化,继续 steal 没有意义,则 wait,否则说明有其他地方调用 _pl 上的 signal,也就是有新的任务加到某个队列里,_pending_signal 也会发生变化,steal 有可能成功。如果进入了 wait,在 _pl 的 signal 被调用的时候也会被唤醒。

steal_task 实质是调用 TaskControl 单例的 steal_task 函数,任务优先本地队列,然后 remote 队列,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
// 1: Acquiring fence is paired with releasing fence in _add_group to
// avoid accessing uninitialized slot of _groups.
const size_t ngroup = _ngroup.load(butil::memory_order_acquire/*1*/);
if (0 == ngroup) {
return false;
}

// NOTE: Don't return inside `for' iteration since we need to update |seed|
bool stolen = false;
size_t s = *seed;
for (size_t i = 0; i < ngroup; ++i, s += offset) {
TaskGroup* g = _groups[s % ngroup];
// g is possibly NULL because of concurrent _destroy_group
if (g) {
if (g->_rq.steal(tid)) {
stolen = true;
break;
}
if (g->_remote_rq.pop(tid)) {
stolen = true;
break;
}
}
}
*seed = s;
return stolen;
}

bthread 的调度就是一直循环取 task 然后执行,有可能 wait 在某个 ParkingLot 上等待其他的唤醒。

参考

BRPC 官方文档
brpc源码解析(四)—— Bthread机制
brpc源码解析(七)—— worker基于ParkingLot的bthread调度
brpc的bthread解读
高性能RPC框架BRPC核心机制分析<一>

作者

zhongtian

发布于

2020-05-26

更新于

2023-12-16

许可协议

评论