用 Rust 搭建有栈协程
发表于更新于阅读时长 9 分钟如果有英文版的话标题应该叫 Build Stackful Coroutine With Rust,本文重度参考此篇教程。
完整代码见Github。
0. 背景知识
在上一篇博客里对不同种类的协程的区别已经有了充分的探讨,此处不再赘述。不过值得一提的是,为什么 Rust 在 1.0 之前使用了有栈协程,却最终选择了无栈协程? 在不同的地方有过很多讨论,总结一下
- 有栈协程需要比较大的运行时,因此不一定适用于所有的场景
- 系统线程和有栈协程的 api 很接近,因此对于定位为系统语言的 rust 来说(有别于可以完全不接触系统线程的其他语言)很容易混淆
- 如前一篇博客所述,无栈协程几乎没有额外的内存开销,符合 Rust 零成本抽象的哲学
但是我仍然要用 Rust 来实现有栈协程,因为在各种能方便地嵌入汇编的语言中,Rust 是审美上最好看的
1. 搭建地基
不妨先想想一个协程应该包含什么。首先,既然是有栈协程,那自然应该有个栈。栈无非就是一段连续的内存空间,所以大概就是Vec<u8>
。然后根据x86-64 call convention,还有一些额外的寄存器需要在函数调用前后保持不变(这被称为 callee saved register),具体说来则是 rbx,rsp,rbp,r12-r15 这七个,其中 rsp 尤为重要,它会被用来记录栈顶的地址,所以
rust
struct Task {stack: Vec<u8>,ctx: ThreadContext,}#[repr(C)]struct ThreadContext {rsp: u64,r15: u64,r14: u64,r13: u64,r12: u64,rbx: u64,rbp: u64,}
因为 Rust 并不存在稳定的 ABI,所以这里要用repr(C)
把 context 的布局定好,方便后面汇编使用。之后再搭一个简易的 runtime
rust
pub struct Runtime {current: Task,tasks: VecDeque<Task>,}
current 用来记录当前正在运行的协程,tasks 则是等待中的协程。再来实现一个简单的调度器
rust
impl Runtime {fn t_yield(&mut self) -> bool {let mut queue = self.queue;if self.queue.len() == 0 {return false;}let mut next = self.queue.pop_front().unwrap();mem::swap(&mut next, &mut self.current);queue.push_back(next);unsafe {let last = self.queue.len() - 1;switch(&mut queue[last].ctx, &mut self.current.ctx);}true}}
这个实现非常简单,只不过是把当前协程塞到队尾,再从队头取一个协程出来运行而已。然而实现了协程切换的switch
函数才是真正有价值的地方。
2. HERE COMES THE DRAGON
不如先从理论的角度考虑一下要怎么实现协程切换。概括一点讲就是把旧的 context 存起来,把新的 context 换上去。context 里具体的内容则是当前的栈,指示栈顶位置的 rsp 和前文提及的另外 6 个不变寄存器,以及记录下一条指令是什么的 rip。然而在 x86-64 的体系下并没有办法直接读取和写入 rip,几乎只有一种方式 -- 也就是函数调用 -- 才能做到。
具体说来,call
指令会把 rip 压入栈顶,ret
指令则是弹出栈顶到 rip。正是利用这个特性,才能实现协程的切换。
rust
#[naked]#[inline(never)]unsafe fn switch(old: *mut ThreadContext, new: *const ThreadContext) {llvm_asm!("mov %rsp, 0x00($0)mov %r15, 0x08($0)mov %r14, 0x10($0)mov %r13, 0x18($0)mov %r12, 0x20($0)mov %rbx, 0x28($0)mov %rbp, 0x30($0)mov 0x00($1), %rspmov 0x08($1), %r15mov 0x10($1), %r14mov 0x18($1), %r13mov 0x20($1), %r12mov 0x28($1), %rbxmov 0x30($1), %rbp"::"r"(old), "r"(new):: "volatile","alignstack");}
这里又用到了两个属性,naked
要求 rustc 不生成额外的指令(后面会看到它常常不听从这一指示),否则常常可能覆盖必须保存的七个寄存器,以及inline(never)
防止这个函数被内联(要是被内联了那前面干的事就全部白搭)。
分析一下整个调用流程,t_yield调用switch,把下一条指令的地址入栈,之后前七条指令把当前的 context 存入 old 里,后七条指令则从 new 里提取出 context,其中 rsp 指向新的栈顶,之后执行ret
,把栈顶弹出给 rip,CPU 就去 rip 指向的地方取下一条指令,新的协程就这样被唤醒了。注意这七个寄存器都必须是在调用前后保持不变的,所以如果在switch内部用到了它们,需要先把原来的值存在栈上,再写入新的值。换言之,我们只希望对switch
的调用储存 rip,而不希望它更改t_yield
中任何一个寄存器的值。
由此可以看出应当怎样对协程进行初始化:
- context 里的 rsp 需要指向栈顶,也就是
task.queue
的某个位置 - 栈顶必须埋藏着要执行函数的位置,这样
switch
函数返回的时候就会执行它
所以有
rust
impl Runtime {pub fn spawn(&mut self, f: fn()) {let mut task = Task::new();let s_ptr = available.stack.as_mut_ptr();let size = task.stack.len();unsafe {ptr::write(s_ptr.offset((size - 0x20) as isize) as *mut u64,f as u64);ptr::write(s_ptr.offset((size - 0x18) as isize) as *mut u64,skip as u64,);ptr::write(s_ptr.offset((size - 0x10) as isize) as *mut u64,guard as u64,);last.ctx.rsp = s_ptr.offset((size - 0x20) as isize) as u64;}self.queue.push(task);}}
task.stack
被当作普通的系统栈来使用,使用时栈顶地址从高到低减小。
然而有一个很微妙的问题就是栈对齐。64 位 CPU 一般来说一次性至少取 64 位数据,因此栈顶必须至少是 8 字节对齐的。然而,x86-64 和 SSE 指令集出来的时间差不多,因此生成的代码几乎一定会包含 SSE 指令,而这样的指令要求地址 16 字节对齐,所以在堆地址保证 16 字节对齐的前提下,此处还需要把函数指针放在离数组头的距离是 16 的倍数的位置,像这样的位置最早是size - 0x10
。
那后面两个函数skip
和guard
又是干什么的呢? 当这里的 f 运行结束之后,我希望他会去唤醒别的协程。换言之,我希望在调用完f
之后调用类似于t_yield
的t_return
,而这正是通过把希望它接着调用的函数地址埋在栈顶 - 8 的地方,这样,f
内部执行到ret
之后就自然会去调用它.
skip
又有什么用呢? 如前所述,栈需要维持 16 字节对齐,所以在保存了 8 字节的返回地址之后,被调用者往往还需要做一些工作来重新对齐栈。然而guard并不知道调用它时栈是 16 位对齐的,所以就会出错。所以这里要加一个额外的空函数skip
,来优雅地对齐guard
的栈.
3. 多核支持
到此为止,一个有栈协程的雏形就已经完成了。但是我们 Rustaceans 决不能像 PHP 人一样没有追求,所以接下来让我们把这个雏形拓展到支持多核。先把之前的Runtime
改叫Machine
(系统线程)
rust
pub struct Runtime {current: usize,machines: Vec<Machine>,}pub struct Machine {queue: VecDeque<Task>,current: Task,}impl Runtime {/// initialize runtime with machines same numbers as cpu corespub fn new() -> Self {let mut machines = Vec::new();let cpus = num_cpus::get();for _ in 0..cpus {machines.push(Machine::new());}Runtime {current: 0,machines,}}}
在把之前给Machine
实现的一套东西再给Runtime
实现一遍就好。此时我们需要记录一个Task
是被分配在哪个线程上,可以使用 Thread Local Storage 完成
rust
#[thread_local]#[no_mangle]static mut WORKER_ID: usize = 0;impl Runtime {pub fn run(&mut self) {rayon::scope(|s| {let mut i = 0;for m in self.machines.iter_mut() {s.spawn(move |_| {unsafe { WORKER_ID = i };while m.t_yield() {}});i += 1;}})}}
既然都走到这里了,不如再接着实现一下任务窃取,乍一看并不复杂,不就是协程完结时查看当前队列是否为空,如果是就从别的队列里拿一些任务过来吗? 然而涉及到线程同步的东西从来就没有简单的。先把队列都装进锁里
rust
pub struct Machine {queue: Mutex<VecDeque<Task>>,current: Task,}
然后改造一下t_yield
rust
impl Machine {fn t_yield(&mut self) -> bool {let mut queue = self.queue.lock().unwrap();// ...unsafe {let last = self.queue.len() - 1;switch(&mut queue[last].ctx, &mut self.current.ctx);}true}}
然后跑一下。Surprise! 死锁了。为什么会这样呢? Rust 广泛地使用了 RAII 机制,所以只有 queue 这个变量被 drop 掉的时候才会释放这个锁。然而正是switch里执行流程被修改了,因此后面的指令直到新协程yield
之后都不会被执行! 那思来想去,只能在switch
内释放锁
rust
unsafe fn switch(old: *mut ThreadContext, new: *const ThreadContext,label: MutexGuard<VecDeque<Task>>) {mem::drop(label)// ...}
再跑一下试试! Segment fault,为什么? 查看汇编代码,可以发现 rustc 额外生成了一句mov %rdx,%rbx
,于是后面的执行都出了问题.
好吧,那怎么办? 回想一下,之所以要把这个函数设置成 naked,是为了避免它去写那七个寄存器使得存在ctx里的值无效,那么只要把这一步拆出来
rust
unsafe fn switch_old(old: *mut ThreadContext)unsafe fn switch_new(old: *mut ThreadContext, new: *const ThreadContext,label: MutexGuard<VecDeque<Task>>) {mem::drop(label)// ...}
就可以了。注意 old 里只能保存剩下 6 个寄存器,而存 rsp 只能在 new 里存。再跑一下,可以发现还是会 Segment fault,这是因为在 naked 函数里不会自动保存寄存器,所以调用别的函数可能会修改它们。所以还需要手动保存rsi的值(在 call convention 中被用作传递第二个参数的,也就是这里的 new)。
这下终于能跑通了。
4. 还差什么?
对比一下有栈协程的标杆 Go 语言,还差些什么? 关于 goroutine 的实现,这个视频是迄今为止最好的讲解(虽然演讲者有很重的俄语口音),相较之下,这个实现还欠缺什么呢?
- 显而易见地,协程版本的各种系统调用(举例说来,各种 IO),不过这只是体力活
- 更精细的调度器,比如考虑到公平性
- 更重要的是,空间可增长的栈
内存是非常宝贵的,所以第三点是保证有栈协程性能的关键之一。然而支持它需要编译器的工作,因此本文将不包含这一部分的实现。不过不妨看看 Go 是怎么做的。
如果一个资源不够用了,那么很自然的解决方案就是再去搞一点来。这样的解决方案被称为分段栈,使用这种它的有早期版本的 Go 和 开了 --fsplit-stack 的 c 系语言。在这种解决方案里,如果编译器觉得栈空间不够大,比如 foo 函数 调用 bar 函数时,那就写入指令向 OS 再申请一些内存,之后 bar 的栈帧就分配在新的空间上,等 bar 返回时再回收这个空间。这样的话栈空间就是不连续的,所以叫做"分段"。
这种方案的问题是非常明显的: 计算机中如果想要高性能,那局域性(locality)是很重要的。分段栈就像链表一样,并不能满足这一点。更重要的问题是 hot split: 试想,如果前面提到的调用 bar 的位置是在一个很长的循环里,这样每个循环都要经历分配新空间 -> 回收新空间的过程。毫无疑问这样会对性能产生影响。
那么 Go 是怎么解决的呢? Go 后来采取了可增长栈(growable stack)方案。这种做法和 Rust 里的Vec
满了之后的做法差不多:
- 申请一块比原先更大的内存
- 把当前的所有栈内容都拷贝过去
- 把所有指向当前栈上内容的指针都调整一遍
- 释放原来的空间
3 是不是听起来有些耳熟? 没错,这一步和 GC 里回收空间之后调整指针的做法差不多。那么一般的 GC 是怎么实现的呢? 可以分为两类
首先是保守式。这种做法里,如果一个值看起来像是指向堆的指针,比如指向的地址在堆上并且满足对齐要求等等,那就认为它是指针。这样的实现里无法知道一个值到底是不是指针,所以不能移动对象。一种补救方式是使用指向指针的指针,所有指向堆的地址都指向某一个 handle 指针,handle 指针指向的内容才是真的堆地址,而 handle 指向的地址可以随便调整。
另一种则是精确式。在前面一节里,可以看到只有知道一些额外信息才能实现对象的移动。在精确式 GC 里,编译器会记录当前所有指针类型的数据(使用 bitmap 之类的手段),在 GC 回收空间时,运行时去查询记录就能知道要调整什么了。
显然 Go 的可增长栈采取了和精确式 GC 一样的方案,但非常迷惑的是即使有着这样的基础设施 Go 并没有实现有 Compact 功能的 GC。