协程学习

Q2 尝试对 c++ librdkafka 库进行了改造,将 pthread 切为了 bthread,以优化生产者场景下,连接大规模集群时线程数过多的问题。顺利基于 bthread 的代码在 ai 的辅助下对协程的实现原理进行了学习。(前三段非 ai 生成)

为什么需要协程

线程是操作系统进行 CPU 时间分配(调度)的最小单位,由线程来执行函数。当函数中执行阻塞操作,例如 sleep、等锁、IO 时,线程会被操作系统挂起进入等待状态,占用线程资源却不做任何有效工作。由于系统能同时维护的线程数量有限,大量线程阻塞等待会造成严重的线程资源浪费。比较理想的情况是,在进行阻塞操作时,将当前函数”挂起”,让这个线程可以执行其它的函数,这样可以高效利用线程资源。

协程的设计思路

从操作系统的视角来看,多个线程会共享一个 CPU 资源,当一个线程执行的时间片用完后,操作系统会将这个线程挂起,然后调度其它线程到这个 CPU 上来执行。对于被挂起的线程,由操作系统来保存它的上下文信息至线程控制块(TCB)中,包含寄存器快照、栈指针等信息。

根据这个思路,可以将需要挂起的函数使用的上下文信息,主要为一些寄存器的数据,保存在它使用的栈上,就可以达到生成一份 “快照” 的目的。在基于这个 “快照” 让函数恢复执行时,就可以从它的栈上将相关上下文信息读取出来进行恢复。这个就是协程实现的大致思路。

一个简易协程的结构体定义如下所示:

1
2
3
4
5
6
7
8
9
10
// 协程控制块(对应 bthread task_meta.h:49 的 TaskMeta)
struct Coroutine {
fcontext_t ctx; // 寄存器快照(= 当前 rsp)
void* stack; // malloc 分配的栈内存(低地址)
size_t stack_size;
CoState state;
void (*fn)(void*); // 用户协程函数
void* arg;
int64_t wake_time_us; // co_sleep 唤醒时间(epoch 微秒),0 表示未睡眠
};

可以看到这里维护了一个 stack ,我们可以将其称为协程栈。对于普通的函数来说,它使用的栈是线程栈,也即操作系统在线程创建后为它开辟的一片内存空间。而在协程模式下,由于多个函数(协程)会共用一个线程,不能公用一个栈,所以这里会为每一个协程都创建一个单独的栈。在生成协程执行的函数的快照时,会将上下文信息保存到这个协程栈上。需要额外注意的是,线程栈和协程栈没有本质的不同,都是一片地址空间,在 CPU 执行时完全等价,只不过负责创建和回收的角色不同,前者是操作系统,后者是协程调度器。

使用协程的方式

协程的主要目的是为了在函数执行阻塞操作时主动让出线程资源。对于 IO 场景,所有的协程就需要将 fd 交给专门的线程通过 epoll 等方式来监听,交出 fd 后主动挂起。在 fd 可读写后,再由专门的调度器来唤醒对应的协程来对 fd 进行读写操作。对于 sleep 的场景,则在调用 sleep 时直接将对应的协程挂起,等待协程调度器在对应的时间将协程唤起。

这里需要特别注意的是,不能在协程中调用为线程准备的相关函数,例如不能直接在协程中调用 poll ,因为这样就让运行协程的线程阻塞住了,效果和用线程执行没有区别。所以一个成熟的协程库,需要为会导致线程阻塞的方法提供类似的协程方法。

以下是一个使用协程库 API 的典型示例,两个协程并发执行,co_sleep 期间线程不阻塞,会自动去运行其他协程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void co_A(void*) {
printf("[ 0ms] A-1\n");
co_sleep(200000); // 让出线程 200ms,期间线程去运行 co_B
printf("[200ms] A-2\n");
}

void co_B(void*) {
printf("[ 0ms] B-1\n");
co_sleep(100000); // 让出线程 100ms
printf("[100ms] B-2\n");
co_sleep(100000);
printf("[200ms] B-3\n");
}

int main() {
co_create(co_A, nullptr); // 创建协程,加入 ready_queue,不立即运行
co_create(co_B, nullptr);
co_run(); // 启动调度循环,阻塞直到所有协程完成
}

// 实际输出:
// [ 0ms] A-1
// [ 0ms] B-1
// [100ms] B-2 ← A 在 sleep,线程转去运行 B,并非真正等待 200ms
// [200ms] A-2
// [200ms] B-3

寄存器与函数栈

理解协程上下文切换的实现细节,需要先了解 CPU 寄存器和函数栈的工作方式,这是实现的直接操作对象。

寄存器

寄存器是位于 CPU 芯片内部的高速存储单元,访问速度比内存快约 100 倍,但数量极少。x86_64 架构有 16 个通用寄存器,每个 8 字节:

1
rax  rbx  rcx  rdx  rsi  rdi  rbp  rsp  r8  r9  r10  r11  r12  r13  r14  r15

其中 rsp(Stack Pointer)最为特殊,它始终指向当前栈的顶部。CPU 执行 pushpopcallret 等指令时都会隐式修改 rsp。协程切换的核心操作,正是通过直接替换 rsp 的值来完成的——rsp 指向哪个协程的栈,CPU 就在哪个协程的世界里运行。

Linux x86_64 的调用约定(System V ABI)规定了寄存器在函数调用时的职责分工:

  • 参数传递:调用函数时,前 6 个参数依次放入 rdirsirdxrcxr8r9,函数的返回值放入 rax

  • Caller-saved 寄存器raxrcxrdxrsirdir8r11):调用者负责保存。调用者若需要在函数调用后继续使用这些寄存器中的值,须在调用前自行压栈;被调函数可以随意覆盖这些寄存器。

  • Callee-saved 寄存器rbxrbpr12r15):被调函数负责保存。被调函数若需要使用这些寄存器,须在函数开头先压栈保存,在返回前恢复原值,确保调用者看不出变化。

这个分工对协程实现至关重要:jump_fcontext 作为被调函数,只需保存 callee-saved 的 6 个寄存器,caller-saved 寄存器的保存已由编译器在调用方的代码中自动处理。

函数栈与栈帧

栈是一片连续内存,地址从高向低增长push X 等价于 rsp -= 8; [rsp] = Xpop X 等价于 X = [rsp]; rsp += 8call func 在跳转前会自动将下一条指令的地址(返回地址)压栈,ret 则将其弹出并跳转过去。

每个函数被调用时,编译器自动生成序言(prologue)和尾声(epilogue),在栈上划出属于自己的一块区域,称为栈帧

1
2
3
4
5
6
7
8
9
10
11
12
13
高地址
┌─────────────────────────┐
│ 调用者的栈帧 │
├─────────────────────────┤
│ 返回地址 │ ← call 指令压入
├─────────────────────────┤ ← 函数序言开始
│ 保存的 rbp │ ← pushq %rbp
│ 保存的 r12、r13 等 │ ← 若函数用到了这些 callee-saved 寄存器
│ 局部变量 │ ← 编译器分配空间
├─────────────────────────┤
│ (被调函数的栈帧) │
rsp ───────────────────── ← 始终指向当前栈顶
低地址

函数返回时,尾声代码弹出所有已保存的寄存器,恢复 rsp,执行 ret 跳回调用者,栈帧随之消失。

函数调用链中,每一层都在栈上留下自己的帧。以协程内的调用链 入口函数 → A → B → co_sleep 为例,挂起时栈的状态是各层帧的叠加:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
高地址(栈底)
┌─────────────────────────┐
│ 入口函数的栈帧 │
├─────────────────────────┤
│ A 的栈帧(含 A 的局部变量、A 保存的寄存器)│
├─────────────────────────┤
│ B 的栈帧(含 B 的局部变量、B 保存的寄存器)│
├─────────────────────────┤
│ co_sleep 的栈帧 │
├─────────────────────────┤
│ jump_fcontext 压入的 │ ← 挂起时额外保存的 6 个寄存器快照
│ 6 个 callee-saved 寄存器│
rsp ───────────────────── ← co->ctx 记录此处
低地址

A 和 B 的局部变量、它们在序言中保存的寄存器,已经作为各自栈帧的一部分存在于栈上。jump_fcontext 只需在此基础上再压入 6 个寄存器的当前快照,然后将 rsp 存入 co->ctx,协程的完整现场就被保存下来了。恢复时,只需将 rsp 恢复到 co->ctx 记录的位置,再依次弹出,整条调用链便会沿原路展开,从挂起点继续执行。

协程的实现细节

协程的实现核心是两个函数:make_fcontext 负责初始化一个新协程的栈,jump_fcontext 负责在两个协程之间完成实际的切换。两者都通过直接操作寄存器和栈内存来实现,在用户态完成,不涉及任何系统调用。

初始化协程栈:make_fcontext

当通过 co_create 创建一个新协程时,需要为其准备好一个可以被 jump_fcontext 直接切入的初始状态。make_fcontext 做的事情可以理解为”伪造一份快照”——在新分配的协程栈上手动填入一套初始寄存器值,使其看起来和一个”已经暂停过的协程”完全一致,这样 jump_fcontext 在第一次切入时就能用统一的方式处理新协程和已暂停协程。

make_fcontext 接受协程栈的高地址端和入口函数地址,在栈顶写入如下结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
高地址(栈底)
┌──────────────────┐
│ ...(未使用) │
├──────────────────┤ ← make_fcontext 返回的 fcontext(初始 rsp)
│ FPU 槽 = 0 │ + 0x00
│ r12 = 0 │ + 0x08
│ r13 = 0 │ + 0x10
│ r14 = 0 │ + 0x18
│ r15 = 0 │ + 0x20
│ rbx = 0 │ + 0x28
│ rbp = 0 │ + 0x30
│ &finish │ + 0x38 ← 入口函数 return 后的去处
│ &入口函数 │ + 0x40 ← 第一次切入时跳转的目标
└──────────────────┘
低地址(栈顶方向)

其中 finish 是一个兜底标签,指向调用 _exit(0) 的代码。如果协程的入口函数执行完后直接 return,控制流会落到 finish,进程退出。在正常的协程实现中,入口函数执行完后应当主动调用 jump_fcontext 切回调度器,而不是依赖 returnfinish 仅作为安全兜底。

对应的汇编实现如下。核心逻辑是:在传入的高地址端向低地址分配 72 字节(0x48),然后按照上方的布局填入初始值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
make_fcontext:
movq %rdi, %rax ; rax = sp(传入的栈高地址端)
andq $-16, %rax ; 对齐到 16 字节边界(末 4 位清零)
leaq -0x48(%rax), %rax ; rax -= 72,向低地址分配 72 字节空间

movq $0, 0x00(%rax) ; FPU 槽 = 0(预留位置,不使用)
movq $0, 0x08(%rax) ; r12 = 0 ┐
movq $0, 0x10(%rax) ; r13 = 0 │
movq $0, 0x18(%rax) ; r14 = 0 ├─ 初始 callee-saved 寄存器值全为 0
movq $0, 0x20(%rax) ; r15 = 0 │
movq $0, 0x28(%rax) ; rbx = 0 │
movq $0, 0x30(%rax) ; rbp = 0 ┘

movq %rdx, 0x38(%rax) ; [+0x38] = fn(首次切入时 jmp 到这里)
leaq finish(%rip), %rcx
movq %rcx, 0x40(%rax) ; [+0x40] = &finish(fn return 后的返回地址)

ret ; 返回 rax,即新协程的初始 fcontext

finish:
xorq %rdi, %rdi
call _exit@PLT ; fn 若直接 return,进程退出(安全兜底)

执行上下文切换:jump_fcontext

jump_fcontext(ofc, nfc, vp) 完成从当前协程到目标协程的切换,整个过程分为三步:

第一步:保存当前协程的状态。 将 6 个 callee-saved 寄存器(rbp, rbx, r12~r15)依次压入当前栈,再将此时的 rsp 值写入 *ofc。至此,当前协程的”快照”就完整地存在了它自己的栈上,*ofc 记录了快照从哪里开始。

这里只保存 callee-saved 寄存器,而不保存全部寄存器,是由编译器和调用约定共同保证的:caller-saved 寄存器(如 raxrdx)在进行任何函数调用前,编译器就已经将其中有用的值保存到栈帧里,jump_fcontext 无需重复处理。

第二步:切换栈。rsp 直接替换为 nfc(目标协程上次保存的 rsp 值)。这一行是整个切换的核心——rsp 一旦改变,CPU 就”活在”了目标协程的栈空间里。

第三步:恢复目标协程的状态。 从目标协程的栈上依次 pop 出 6 个寄存器,最后 pop 出目标协程的 PC(程序计数器),通过 jmp 跳转过去。如果目标协程是首次被切入,PC 就是 make_fcontext 写入的入口函数地址;如果是从暂停处恢复,PC 就是目标协程上次调用 jump_fcontext 的返回地址,从断点处继续执行。

对应的汇编实现如下,三个阶段与上述描述一一对应:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
jump_fcontext:
; === 第一步:保存当前协程 ===
pushq %rbp ; 将 6 个 callee-saved 寄存器依次压入当前栈
pushq %rbx
pushq %r15
pushq %r14
pushq %r13
pushq %r12
leaq -0x8(%rsp), %rsp ; rsp -= 8,为 FPU 槽预留空间(不保存内容)
movq %rsp, (%rdi) ; *ofc = rsp,将当前栈顶存入 ofc(快照的起点)

; === 第二步:切换到目标协程 ===
movq %rsi, %rsp ; rsp = nfc,一行换栈,CPU 进入目标协程的世界

; === 第三步:恢复目标协程 ===
leaq 0x8(%rsp), %rsp ; 跳过 FPU 槽
popq %r12 ; 从目标协程的栈上依次弹出 6 个 callee-saved 寄存器
popq %r13
popq %r14
popq %r15
popq %rbx
popq %rbp
popq %r8 ; 弹出目标协程的 PC(首次进入为 fn 地址,恢复则为断点地址)
movq %rdx, %rax ; rax = vp(设置函数返回值)
movq %rdx, %rdi ; rdi = vp(若首次进入,作为入口函数的第一个参数)
jmp *%r8 ; 跳到目标协程的 PC,恢复执行

调度器

调度器维护两个队列:ready_queue 存放可以立即运行的协程,sleep_queue 存放正在等待计时的协程。调度循环持续执行以下步骤:

  1. 检查 sleep_queue,将唤醒时间已到的协程移入 ready_queue

  2. ready_queue 取出队头协程,调用 jump_fcontext 切入执行。协程让出或执行完毕后,控制流回到此处,继续下一轮循环。

  3. ready_queue 为空但 sleep_queue 非空,说明所有协程都在等待计时,调度器通过 nanosleep 休眠到最近的唤醒时间,避免忙等。

  4. 两个队列均为空时,退出循环。

调度器本身运行在线程栈上,其上下文保存在 main_ctx 中。每次切入协程时,jump_fcontext 将线程栈的 rsp 存入 main_ctx;每次协程让出时,jump_fcontext 再用 main_ctxrsp 切回线程栈,调度器从上次切出的位置继续运行。

调度循环的核心实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
static void schedule(Scheduler* sched) {
while (true) {
// 1. 将到期的睡眠协程移入 ready_queue
int64_t now = now_us();
for (auto it = sched->sleep_queue.begin(); it != sched->sleep_queue.end(); ) {
if ((*it)->wake_time_us <= now) {
(*it)->state = CO_READY;
sched->ready_queue.push_back(*it);
it = sched->sleep_queue.erase(it);
} else {
++it;
}
}

// 2. 切入下一个可运行的协程
if (!sched->ready_queue.empty()) {
Coroutine* next = sched->ready_queue.front();
sched->ready_queue.pop_front();
next->state = CO_RUNNING;
jump_fcontext(&sched->main_ctx, next->ctx, 0); // ← 切出;协程让出后从此处继续
if (next->state == CO_DEAD) { free(next->stack); delete next; }
continue;
}

// 3. 无可运行协程:休眠到最近的唤醒时间,避免忙等
if (!sched->sleep_queue.empty()) {
int64_t min_wake = sched->sleep_queue[0]->wake_time_us;
for (auto* co : sched->sleep_queue)
if (co->wake_time_us < min_wake) min_wake = co->wake_time_us;
int64_t sleep_us = min_wake - now_us();
if (sleep_us > 0) {
struct timespec ts = { sleep_us / 1000000, (sleep_us % 1000000) * 1000 };
nanosleep(&ts, nullptr);
}
continue;
}

break; // 4. 两个队列均为空,所有协程已完成
}
}

注意第 2 步中 jump_fcontext 那一行:切入协程时在此处”跳出”,协程让出或死亡后控制流回到此处”落地”,整个调度循环就是对这一行的不断重复。

co_sleep 与 co_yield

co_sleep(us) 的实现:将当前协程的唤醒时间记录到 wake_time_us,状态置为 CO_SLEEPING,加入 sleep_queue,然后调用 jump_fcontext 切回调度器。协程在此处暂停,直到调度器在计时到期后将其重新放入 ready_queue 并切回来,jump_fcontext 返回,co_sleep 继续执行后续代码。

1
2
3
4
5
6
void co_sleep(uint64_t us) {
co->wake_time_us = now_us() + us;
co->state = CO_SLEEPING;
sched->sleep_queue.push_back(co);
jump_fcontext(&co->ctx, sched->main_ctx, 0); // 在这里暂停,也在这里恢复
}

co_yield() 的逻辑更简单:将自身重新加入 ready_queue 尾部,然后立即切回调度器。调度器会先运行队列中其他协程,最终再切回来。co_yield 的作用是主动让出本轮执行机会,常用于避免某个协程长期占用线程。

1
2
3
4
5
void co_yield() {
co->state = CO_READY;
sched->ready_queue.push_back(co); // 重新入队,排到队尾
jump_fcontext(&co->ctx, sched->main_ctx, 0); // 在这里暂停,也在这里恢复
}

这两个函数的共同点是:暂停和恢复都发生在 jump_fcontext 这同一行代码上——切出时在此处暂停,切回时从此处继续,对上层调用者来说看起来就像一次普通的函数调用,只是耗时可能比较长。

总结

协程通过为每个函数分配独立的栈,在需要阻塞时将寄存器状态保存到协程栈并切换 rsp,让线程转去执行其他协程,待条件满足后再恢复,从而以极低的切换开销实现单线程并发,避免阻塞造成的线程资源浪费。