用 Rust 搭建有栈协程

发表于更新于阅读时长 9 分钟

如果有英文版的话标题应该叫 Build Stackful Coroutine With Rust,本文重度参考此篇教程

完整代码见Github

0. 背景知识

在上一篇博客里对不同种类的协程的区别已经有了充分的探讨,此处不再赘述。不过值得一提的是,为什么 Rust 在 1.0 之前使用了有栈协程,却最终选择了无栈协程? 在不同的地方有过很多讨论,总结一下

  1. 有栈协程需要比较大的运行时,因此不一定适用于所有的场景
  2. 系统线程和有栈协程的 api 很接近,因此对于定位为系统语言的 rust 来说(有别于可以完全不接触系统线程的其他语言)很容易混淆
  3. 如前一篇博客所述,无栈协程几乎没有额外的内存开销,符合 Rust 零成本抽象的哲学

但是我仍然要用 Rust 来实现有栈协程,因为在各种能方便地嵌入汇编的语言中,Rust 是审美上最好看的

1. 搭建地基

不妨先想想一个协程应该包含什么。首先,既然是有栈协程,那自然应该有个栈。栈无非就是一段连续的内存空间,所以大概就是Vec<u8>。然后根据x86-64 call convention,还有一些额外的寄存器需要在函数调用前后保持不变(这被称为 callee saved register),具体说来则是 rbx,rsp,rbp,r12-r15 这七个,其中 rsp 尤为重要,它会被用来记录栈顶的地址,所以

struct Task {
    stack: Vec<u8>,
    ctx: ThreadContext,
}

#[repr(C)]
struct ThreadContext {
    rsp: u64,
    r15: u64,
    r14: u64,
    r13: u64,
    r12: u64,
    rbx: u64,
    rbp: u64,
}

因为 Rust 并不存在稳定的 ABI,所以这里要用repr(C)把 context 的布局定好,方便后面汇编使用。之后再搭一个简易的 runtime

pub struct Runtime {
    current: Task,
    tasks: VecDeque<Task>,
}

current 用来记录当前正在运行的协程,tasks 则是等待中的协程。再来实现一个简单的调度器

impl Runtime {
    fn t_yield(&mut self) -> bool {
        let mut queue = self.queue;
        if self.queue.len() == 0 {
            return false;
        }
        let mut next = self.queue.pop_front().unwrap();
        mem::swap(&mut next, &mut self.current);
        queue.push_back(next);

        unsafe {
            let last = self.queue.len() - 1;
            switch(&mut queue[last].ctx, &mut self.current.ctx);
        }

        true
    }
}

这个实现非常简单,只不过是把当前协程塞到队尾,再从队头取一个协程出来运行而已。然而实现了协程切换的switch函数才是真正有价值的地方。

2. HERE COMES THE DRAGON

不如先从理论的角度考虑一下要怎么实现协程切换。概括一点讲就是把旧的 context 存起来,把新的 context 换上去。context 里具体的内容则是当前的栈,指示栈顶位置的 rsp 和前文提及的另外 6 个不变寄存器,以及记录下一条指令是什么的 rip。然而在 x86-64 的体系下并没有办法直接读取和写入 rip,几乎只有一种方式 -- 也就是函数调用 -- 才能做到。

具体说来,call指令会把 rip 压入栈顶,ret指令则是弹出栈顶到 rip。正是利用这个特性,才能实现协程的切换。

#[naked]
#[inline(never)]
unsafe fn switch(old: *mut ThreadContext, new: *const ThreadContext) {
    llvm_asm!("
        mov     %rsp, 0x00($0)
        mov     %r15, 0x08($0)
        mov     %r14, 0x10($0)
        mov     %r13, 0x18($0)
        mov     %r12, 0x20($0)
        mov     %rbx, 0x28($0)
        mov     %rbp, 0x30($0)

        mov     0x00($1), %rsp
        mov     0x08($1), %r15
        mov     0x10($1), %r14
        mov     0x18($1), %r13
        mov     0x20($1), %r12
        mov     0x28($1), %rbx
        mov     0x30($1), %rbp
        "
    :
    :"r"(old), "r"(new)
    :
    : "volatile","alignstack"
    );
}

这里又用到了两个属性,naked要求 rustc 不生成额外的指令(后面会看到它常常不听从这一指示),否则常常可能覆盖必须保存的七个寄存器,以及inline(never)防止这个函数被内联(要是被内联了那前面干的事就全部白搭)。

分析一下整个调用流程,t_yield调用switch,把下一条指令的地址入栈,之后前七条指令把当前的 context 存入 old 里,后七条指令则从 new 里提取出 context,其中 rsp 指向新的栈顶,之后执行ret,把栈顶弹出给 rip,CPU 就去 rip 指向的地方取下一条指令,新的协程就这样被唤醒了。注意这七个寄存器都必须是在调用前后保持不变的,所以如果在switch内部用到了它们,需要先把原来的值存在栈上,再写入新的值。换言之,我们只希望对switch的调用储存 rip,而不希望它更改t_yield中任何一个寄存器的值。

由此可以看出应当怎样对协程进行初始化:

  1. context 里的 rsp 需要指向栈顶,也就是task.queue的某个位置
  2. 栈顶必须埋藏着要执行函数的位置,这样switch函数返回的时候就会执行它

所以有

impl Runtime {
    pub fn spawn(&mut self, f: fn()) {
        let mut task = Task::new();
        let s_ptr = available.stack.as_mut_ptr();
        let size = task.stack.len();

        unsafe {
            ptr::write(s_ptr.offset((size - 0x20) as isize) as *mut u64,f as u64);
            ptr::write(
                s_ptr.offset((size - 0x18) as isize) as *mut u64,
                skip as u64,
            );
            ptr::write(
                s_ptr.offset((size - 0x10) as isize) as *mut u64,
                guard as u64,
            );

            last.ctx.rsp = s_ptr.offset((size - 0x20) as isize) as u64;
        }

        self.queue.push(task);
    }
}

task.stack被当作普通的系统栈来使用,使用时栈顶地址从高到低减小。

然而有一个很微妙的问题就是栈对齐。64 位 CPU 一般来说一次性至少取 64 位数据,因此栈顶必须至少是 8 字节对齐的。然而,x86-64 和 SSE 指令集出来的时间差不多,因此生成的代码几乎一定会包含 SSE 指令,而这样的指令要求地址 16 字节对齐,所以在堆地址保证 16 字节对齐的前提下,此处还需要把函数指针放在离数组头的距离是 16 的倍数的位置,像这样的位置最早是size - 0x10

那后面两个函数skipguard又是干什么的呢? 当这里的 f 运行结束之后,我希望他会去唤醒别的协程。换言之,我希望在调用完f之后调用类似于t_yieldt_return,而这正是通过把希望它接着调用的函数地址埋在栈顶 - 8 的地方,这样,f内部执行到ret之后就自然会去调用它.

skip又有什么用呢? 如前所述,栈需要维持 16 字节对齐,所以在保存了 8 字节的返回地址之后,被调用者往往还需要做一些工作来重新对齐栈。然而guard并不知道调用它时栈是 16 位对齐的,所以就会出错。所以这里要加一个额外的空函数skip,来优雅地对齐guard的栈.

3. 多核支持

到此为止,一个有栈协程的雏形就已经完成了。但是我们 Rustaceans 决不能像 PHP 人一样没有追求,所以接下来让我们把这个雏形拓展到支持多核。先把之前的Runtime改叫Machine(系统线程)

pub struct Runtime {
    current: usize,
    machines: Vec<Machine>,
}

pub struct Machine {
    queue: VecDeque<Task>,
    current: Task,
}

impl Runtime {
    /// initialize runtime with machines same numbers as cpu cores
    pub fn new() -> Self {
        let mut machines = Vec::new();
        let cpus = num_cpus::get();
        for _ in 0..cpus {
            machines.push(Machine::new());
        }
        Runtime {
            current: 0,
            machines,
        }
    }
}

在把之前给Machine实现的一套东西再给Runtime实现一遍就好。此时我们需要记录一个Task是被分配在哪个线程上,可以使用 Thread Local Storage 完成

#[thread_local]
#[no_mangle]
static mut WORKER_ID: usize = 0;

impl Runtime {
    pub fn run(&mut self) {
        rayon::scope(|s| {
            let mut i = 0;
            for m in self.machines.iter_mut() {
                s.spawn(move |_| {
                    unsafe { WORKER_ID = i };
                    while m.t_yield() {}
                });
                i += 1;
            }
        })
    }
}

既然都走到这里了,不如再接着实现一下任务窃取,乍一看并不复杂,不就是协程完结时查看当前队列是否为空,如果是就从别的队列里拿一些任务过来吗? 然而涉及到线程同步的东西从来就没有简单的。先把队列都装进锁里

pub struct Machine {
    queue: Mutex<VecDeque<Task>>,
    current: Task,
}

然后改造一下t_yield

impl Machine {
    fn t_yield(&mut self) -> bool {
        let mut queue = self.queue.lock().unwrap();
        // ...
        unsafe {
            let last = self.queue.len() - 1;
            switch(&mut queue[last].ctx, &mut self.current.ctx);
        }
        true
    }
}

然后跑一下。Surprise! 死锁了。为什么会这样呢? Rust 广泛地使用了 RAII 机制,所以只有 queue 这个变量被 drop 掉的时候才会释放这个锁。然而正是switch里执行流程被修改了,因此后面的指令直到新协程yield之后都不会被执行! 那思来想去,只能在switch内释放锁

unsafe fn switch(old: *mut ThreadContext, new: *const ThreadContext,
    label: MutexGuard<VecDeque<Task>>) {
    mem::drop(label)
    // ...
}

再跑一下试试! Segment fault,为什么? 查看汇编代码,可以发现 rustc 额外生成了一句mov %rdx,%rbx,于是后面的执行都出了问题.

好吧,那怎么办? 回想一下,之所以要把这个函数设置成 naked,是为了避免它去写那七个寄存器使得存在ctx里的值无效,那么只要把这一步拆出来

unsafe fn switch_old(old: *mut ThreadContext)
unsafe fn switch_new(old: *mut ThreadContext, new: *const ThreadContext,
    label: MutexGuard<VecDeque<Task>>) {
    mem::drop(label)
    // ...
}

就可以了。注意 old 里只能保存剩下 6 个寄存器,而存 rsp 只能在 new 里存。再跑一下,可以发现还是会 Segment fault,这是因为在 naked 函数里不会自动保存寄存器,所以调用别的函数可能会修改它们。所以还需要手动保存rsi的值(在 call convention 中被用作传递第二个参数的,也就是这里的 new)。

这下终于能跑通了。

4. 还差什么?

对比一下有栈协程的标杆 Go 语言,还差些什么? 关于 goroutine 的实现,这个视频是迄今为止最好的讲解(虽然演讲者有很重的俄语口音),相较之下,这个实现还欠缺什么呢?

  1. 显而易见地,协程版本的各种系统调用(举例说来,各种 IO),不过这只是体力活
  2. 更精细的调度器,比如考虑到公平性
  3. 更重要的是,空间可增长的栈

内存是非常宝贵的,所以第三点是保证有栈协程性能的关键之一。然而支持它需要编译器的工作,因此本文将不包含这一部分的实现。不过不妨看看 Go 是怎么做的。

如果一个资源不够用了,那么很自然的解决方案就是再去搞一点来。这样的解决方案被称为分段栈,使用这种它的有早期版本的 Go 和 开了 --fsplit-stack 的 c 系语言。在这种解决方案里,如果编译器觉得栈空间不够大,比如 foo 函数 调用 bar 函数时,那就写入指令向 OS 再申请一些内存,之后 bar 的栈帧就分配在新的空间上,等 bar 返回时再回收这个空间。这样的话栈空间就是不连续的,所以叫做"分段"。

这种方案的问题是非常明显的: 计算机中如果想要高性能,那局域性(locality)是很重要的。分段栈就像链表一样,并不能满足这一点。更重要的问题是 hot split: 试想,如果前面提到的调用 bar 的位置是在一个很长的循环里,这样每个循环都要经历分配新空间 -> 回收新空间的过程。毫无疑问这样会对性能产生影响。

那么 Go 是怎么解决的呢? Go 后来采取了可增长栈(growable stack)方案。这种做法和 Rust 里的Vec满了之后的做法差不多:

  1. 申请一块比原先更大的内存
  2. 把当前的所有栈内容都拷贝过去
  3. 把所有指向当前栈上内容的指针都调整一遍
  4. 释放原来的空间

3 是不是听起来有些耳熟? 没错,这一步和 GC 里回收空间之后调整指针的做法差不多。那么一般的 GC 是怎么实现的呢? 可以分为两类

首先是保守式。这种做法里,如果一个值看起来像是指向堆的指针,比如指向的地址在堆上并且满足对齐要求等等,那就认为它是指针。这样的实现里无法知道一个值到底是不是指针,所以不能移动对象。一种补救方式是使用指向指针的指针,所有指向堆的地址都指向某一个 handle 指针,handle 指针指向的内容才是真的堆地址,而 handle 指向的地址可以随便调整。

另一种则是精确式。在前面一节里,可以看到只有知道一些额外信息才能实现对象的移动。在精确式 GC 里,编译器会记录当前所有指针类型的数据(使用 bitmap 之类的手段),在 GC 回收空间时,运行时去查询记录就能知道要调整什么了。

显然 Go 的可增长栈采取了和精确式 GC 一样的方案,但非常迷惑的是即使有着这样的基础设施 Go 并没有实现有 Compact 功能的 GC。

© 2016 - 2024Austaras Devas