用 Rust 搭建有栈协程

发表于更新于阅读时长 10 分钟

如果有英文版的话标题应该叫 Build Stackful Coroutine With Rust, 本文重度参考此篇教程

完整代码见Github

0. 背景知识

在上一篇博客里对不同种类的协程的区别已经有了充分的探讨, 此处不再赘述. 不过值得一提的是, 为什么 Rust 在 1.0 之前使用了有栈协程, 却最终选择了无栈协程? 在不同的地方有过很多讨论, 总结一下

  1. 有栈协程需要比较大的运行时, 因此不一定适用于所有的场景
  2. 系统线程和有栈协程的 api 很接近, 因此对于定位为系统语言的 rust 来说(有别于可以完全不接触系统线程的其他语言)很容易混淆
  3. 如前一篇博客所述, 无栈协程几乎没有额外的内存开销, 符合 Rust 零成本抽象的哲学

但是我仍然要用 Rust 来实现有栈协程, 因为在各种能方便地嵌入汇编的语言中, Rust 是审美上最好看的

1. 搭建地基

不妨先想想一个协程应该包含什么. 首先, 既然是有栈协程, 那自然应该有个栈. 栈无非就是一段连续的内存空间, 所以大概就是Vec<u8>. 然后根据x86-64 call convention, 还有一些额外的寄存器需要在函数调用前后保持不变(这被称为 callee saved register), 具体说来则是 rbx, rsp, rbp, r12-r15 这七个, 其中 rsp 尤为重要, 它会被用来记录栈顶的地址, 所以

struct Task {
    stack: Vec<u8>,
    ctx: ThreadContext,
}

#[repr(C)]
struct ThreadContext {
    rsp: u64,
    r15: u64,
    r14: u64,
    r13: u64,
    r12: u64,
    rbx: u64,
    rbp: u64,
}

因为 Rust 并不存在稳定的 ABI, 所以这里要用repr(C)把 context 的布局定好, 方便后面汇编使用, 之后再搭一个简易的 runtime

pub struct Runtime {
    current: Task,
    tasks: VecDeque<Task>,
}

current 用来记录当前正在运行的协程, tasks 则是等待中的协程. 再来实现一个简单的调度器

impl Runtime {
    fn t_yield(&mut self) -> bool {
        let mut queue = self.queue;
        if self.queue.len() == 0 {
            return false;
        }
        let mut next = self.queue.pop_front().unwrap();
        mem::swap(&mut next, &mut self.current);
        queue.push_back(next);

        unsafe {
            let last = self.queue.len() - 1;
            switch(&mut queue[last].ctx, &mut self.current.ctx);
        }

        true
    }
}

这个实现非常简单, 只不过是把当前协程塞到队尾, 再从队头取一个协程出来运行而已. 然而实现了协程切换的switch函数才是真正有价值的地方

2. HERE COMES THE DRAGON

不如先从理论的角度考虑一下要怎么实现协程切换. 概括一点讲就是把旧的 context 存起来, 把新的 context 换上去. context 里具体的内容则是当前的栈, 指示栈顶位置的 rsp 和前文提及的另外 6 个不变寄存器, 以及记录下一条指令是什么的 rip. 然而在 x86-64 的体系下并没有办法直接读取和写入 rip, 几乎只有一种方式 -- 也就是函数调用 -- 才能做到.

具体说来, call指令会把 rip 压入栈顶, ret指令则是弹出栈顶到 rip. 正是利用这个特性, 才能实现协程的切换.

#[naked]
#[inline(never)]
unsafe fn switch(old: *mut ThreadContext, new: *const ThreadContext) {
    llvm_asm!("
        mov     %rsp, 0x00($0)
        mov     %r15, 0x08($0)
        mov     %r14, 0x10($0)
        mov     %r13, 0x18($0)
        mov     %r12, 0x20($0)
        mov     %rbx, 0x28($0)
        mov     %rbp, 0x30($0)

        mov     0x00($1), %rsp
        mov     0x08($1), %r15
        mov     0x10($1), %r14
        mov     0x18($1), %r13
        mov     0x20($1), %r12
        mov     0x28($1), %rbx
        mov     0x30($1), %rbp
        "
    :
    :"r"(old), "r"(new)
    :
    : "volatile", "alignstack"
    );
}

这里又用到了两个属性, naked要求 rustc 不生成额外的指令(后面会看到它常常不听从这一指示), 否则常常可能覆盖必须保存的七个寄存器,以及inline(never)防止这个函数被内联(要是被内联了那前面干的事就全部白搭).

分析一下整个调用流程, t_yield调用switch, 把下一条指令的地址入栈, 之后前七条指令把当前的 context 存入 old 里, 后七条指令则从 new 里提取出 context, 其中 rsp 指向新的栈顶, 之后执行ret, 把栈顶弹出给 rip, CPU 就去 rip 指向的地方取下一条指令, 新的协程就这样被唤醒了. 注意这七个寄存器都必须是在调用前后保持不变的, 所以如果在switch内部用到了它们, 需要先把原来的值存在栈上, 再写入新的值. 换言之, 我们只希望对switch的调用储存rip, 而不希望它更改t_yield中任何一个寄存器的值.

由此可以看出应当怎样对协程进行初始化:

  1. context 里的 rsp 需要指向栈顶, 也就是task.queue的某个位置
  2. 栈顶必须埋藏着要执行函数的位置, 这样switch函数返回的时候就会执行它

所以有

impl Runtime {
    pub fn spawn(&mut self, f: fn()) {
        let mut task = Task::new();
        let s_ptr = available.stack.as_mut_ptr();
        let size = task.stack.len();

        unsafe {
            ptr::write(s_ptr.offset((size - 0x20) as isize) as *mut u64, f as u64);
            ptr::write(
                s_ptr.offset((size - 0x18) as isize) as *mut u64,
                skip as u64,
            );
            ptr::write(
                s_ptr.offset((size - 0x10) as isize) as *mut u64,
                guard as u64,
            );

            last.ctx.rsp = s_ptr.offset((size - 0x20) as isize) as u64;
        }

        self.queue.push(task);
    }
}

task.stack被当作普通的系统栈来使用, 使用时栈顶地址从高到低减小.

然而有一个很微妙的问题就是栈对齐. 64 位 CPU 一般来说一次性至少取 64 位数据, 因此栈顶必须至少是 8 字节对齐的. 然而, x86-64 和 SSE 指令集出来的时间差不多, 因此生成的代码几乎一定会包含 SSE 指令, 而这样的指令要求地址 16 字节对齐, 所以在堆地址保证 16 字节对齐的前提下, 此处还需要把函数指针放在离数组头的距离是 16 的倍数的位置, 像这样的位置最早是size - 0x10.

那后面两个函数skipguard又是干什么的呢? 当这里的 f 运行结束之后, 我希望他会去唤醒别的协程. 换言之, 我希望在调用完 f 之后调用类似于t_yieldt_return, 而这正是通过把希望它接着调用的函数地址埋在栈顶 - 8 的地方, 这样, f 内部执行到ret之后就自然会去调用它.

skip又有什么用呢? 如前所述, 栈需要维持 16 字节对齐, 所以在保存了 8 字节的返回地址之后, 被调用者往往还需要做一些工作来重新对齐栈. 然而guard并不知道调用它时栈是 16 位对齐的, 所以就会出错. 所以这里要加一个额外的空函数skip, 来优雅地对齐guard的栈.

3. 多核支持

到此为止, 一个有栈协程的雏形就已经完成了. 但是我们 Rustaceans 决不能像 PHP 人一样没有追求, 所以接下来让我们把这个雏形拓展到支持多核. 先把之前的Runtime改叫Machine(系统线程)

pub struct Runtime {
    current: usize,
    machines: Vec<Machine>,
}

pub struct Machine {
    queue: VecDeque<Task>,
    current: Task,
}

impl Runtime {
    /// initialize runtime with machines same numbers as cpu cores
    pub fn new() -> Self {
        let mut machines = Vec::new();
        let cpus = num_cpus::get();
        for _ in 0..cpus {
            machines.push(Machine::new());
        }
        Runtime {
            current: 0,
            machines,
        }
    }
}

在把之前给Machine实现的一套东西再给Runtime实现一遍就好. 此时我们需要记录一个Task是被分配在哪个线程上, 可以使用 Thread Local Storage 完成

#[thread_local]
#[no_mangle]
static mut WORKER_ID: usize = 0;

impl Runtime {
    pub fn run(&mut self) {
        rayon::scope(|s| {
            let mut i = 0;
            for m in self.machines.iter_mut() {
                s.spawn(move |_| {
                    unsafe { WORKER_ID = i };
                    while m.t_yield() {}
                });
                i += 1;
            }
        })
    }
}

既然都走到这里了, 不如再接着实现一下任务窃取, 乍一看并不复杂, 不就是协程完结时查看当前队列是否为空, 如果是就从别的队列里拿一些任务过来吗? 然而涉及到线程同步的东西从来就没有简单的. 先把队列都装进锁里

pub struct Machine {
    queue: Mutex<VecDeque<Task>>,
    current: Task,
}

然后改造一下t_yield

impl Machine {
    fn t_yield(&mut self) -> bool {
        let mut queue = self.queue.lock().unwrap();
        // ...
        unsafe {
            let last = self.queue.len() - 1;
            switch(&mut queue[last].ctx, &mut self.current.ctx);
        }
        true
    }
}

然后跑一下. Surprise! 死锁了. 为什么会这样呢? Rust 广泛地使用了 RAII 机制, 所以只有 queue 这个变量被 drop 掉的时候才会释放这个锁. 然而正是switch里执行流程被修改了, 因此后面的指令直到新协程 yield 之后都不会被执行! 那思来想去, 只能在 switch 内释放锁

unsafe fn switch(old: *mut ThreadContext, new: *const ThreadContext,
    label: MutexGuard<VecDeque<Task>>) {
    mem::drop(label)
    // ...
}

再跑一下试试! Segment fault, 为什么? 查看汇编代码, 可以发现 rustc 额外生成了一句mov %rdx, %rbx, 于是后面的执行都出了问题.

好吧, 那怎么办? 回想一下, 之所以要把这个函数设置成 naked, 是为了避免它去写那七个寄存器使得存在ctx里的值无效, 那么只要把这一步拆出来

unsafe fn switch_old(old: *mut ThreadContext)
unsafe fn switch_new(old: *mut ThreadContext, new: *const ThreadContext,
    label: MutexGuard<VecDeque<Task>>) {
    mem::drop(label)
    // ...
}

就可以了. 注意 old 里只能保存剩下 6 个寄存器, 而存rsp只能在 new 里存. 再跑一下, 可以发现还是会 Segment fault, 这是因为在 naked 函数里不会自动保存寄存器, 所以调用别的函数可能会修改它们. 所以还需要手动保存rsi的值(在 call convention 中被用作传递第二个参数的, 也就是这里的 new).

这下终于能跑通了.

4. 还差什么?

对比一下有栈协程的标杆 Go 语言, 还差些什么? 关于 goroutine 的实现, 这个视频是迄今为止最好的讲解(虽然演讲者有很重的俄语口音), 相较之下, 这个实现还欠缺什么呢?

  1. 显而易见地, 协程版本的各种系统调用(举例说来, 各种 IO), 不过这只是体力活
  2. 更精细的调度器, 比如考虑到公平性
  3. 更重要的是, 空间可增长的栈

内存是非常宝贵的, 所以第三点是保证有栈协程性能的关键之一. 然而支持它需要编译器的工作, 因此本文将不包含这一部分的实现. 不过不妨看看 Go 是怎么做的.

如果一个资源不够用了, 那么很自然的解决方案就是再去搞一点来. 这样的解决方案被称为分段栈, 使用这种它的有早期版本的 Go 和 开了 --fsplit-stack 的 c 系语言. 在这种解决方案里, 如果编译器觉得栈空间不够大, 比如 foo 函数 调用 bar 函数时, 那就写入指令向 OS 再申请一些内存, 之后 bar 的栈帧就分配在新的空间上, 等 bar 返回时再回收这个空间. 这样的话栈空间就是不连续的, 所以叫做"分段".

这种方案的问题是非常明显的: 计算机中如果想要高性能, 那局域性(locality)是很重要的. 分段栈就像链表一样, 并不能满足这一点. 更重要的问题是 hot split: 试想, 如果前面提到的调用 bar 的位置是在一个很长的循环里, 这样每个循环都要经历分配新空间 -> 回收新空间的过程. 毫无疑问这样会对性能产生影响.

那么 Go 是怎么解决的呢? Go 后来采取了可增长栈(growable stack)方案. 这种做法和 Rust 里的Vec满了之后的做法差不多:

  1. 申请一块比原先更大的内存
  2. 把当前的所有栈内容都拷贝过去
  3. 把所有指向当前栈上内容的指针都调整一遍
  4. 释放原来的空间

3 是不是听起来有些耳熟? 没错, 这一步和 GC 里回收空间之后调整指针的做法差不多. 那么一般的 GC 是怎么实现的呢? 可以分为两类

首先是保守式. 这种做法里, 如果一个值看起来像是指向堆的指针, 比如指向的地址在堆上并且满足对齐要求等等, 那就认为它是指针. 这样的实现里无法知道一个值到底是不是指针, 所以不能移动对象. 一种补救方式是使用指向指针的指针, 所有指向堆的地址都指向某一个 handle 指针, handle 指针指向的内容才是真的堆地址, 而 handle 指向的地址可以随便调整.

另一种则是精确式. 在前面一节里, 可以看到只有知道一些额外信息才能实现对象的移动. 在精确式 GC 里, 编译器会记录当前所有指针类型的数据(使用 bitmap 之类的手段), 在 GC 回收空间时, 运行时去查询记录就能知道要调整什么了.

显然 Go 的可增长栈采取了和精确式 GC 一样的方案, 但非常迷惑的是即使有着这样的基础设施 Go 并没有实现有 Compact 功能的 GC.

© 2016 - 2022Austaras Devas