用 TypeScript 看无栈协程

用 TypeScript 看无栈协程

发表于更新于阅读时长 6 分钟

如果有英文版的话标题应该叫 Stackless Coroutine,A TypeScript Perspective

众所周知,ES2017 标准中为 JavaScript 规定了 async 函数,它是无栈协程的一种实现。然而考察 JS 引擎的实现是非常困难的,因此本文借助 TypeScript 编译器的转译功能来考察 JS 的 async 的具体实现方式

0. 背景知识

协程(coroutine),纤程(fiber)还有绿色线程(green thread),所指的都是是编程语言中的一类组件,即用户态非抢占性多任务。在拥有这样能力的语言中,该语言的运行时可以挂起/恢复一段程序,使得程序整体获得异步处理的能力。它常被用于提升异步处理的效率(因为众所周知,系统调用是缓慢的,而协程可以和函数调用一样快),也被用来替不支持线程的语言,比如下文中将要讨论的 JavsScript,提供异步能力。

协程可以按照以下 3 个角度进行分类1:

  • 对称/不对称。前者提供让出执行权的原语,后者提供唤起协程和把执行权转交给调用者的原语
  • 第一类/受限。协程是不是第一类对象
  • 有栈/无栈。协程能在调用栈的任意处挂起,还是只能在协程的函数体内挂起

由于本文只讨论协程的实现,因此下文将着重讨论对实现有重要影响的第三点

特色 有栈协程 无栈协程
可以挂起的位置 任意函数 仅有显式标注了 async 的函数
让出执行权时机 任意位置 仅可显式 yield
上下文存放于(至少需要当前函数的栈,寄存器和重入位置) 由运行时管理的协程栈 对象或闭包
如何挂起 运行时暂停当前函数的执行 把普通函数编译成可重入函数,挂起时直接结束当前函数
如何恢复 由运行时从协程栈中提取局部变量和挂起位置 重新调用函数,函数内从 continuation 中提取 label 并找到相应位置
额外内存开销 每个协程都需要独立的栈空间,可以设计成动态大小来减少开销 接近于零
需要 运行时支持 编译器/预处理器支持
已有代码 可能不改写 需要改写

go lua ruby 采取了前者,python(asyncio) kotlin rust 以及 javascript 选择了后者。

kotlin 的协程实现可以参见这篇博客,在此仅作简要说明。由于 JVM 的闭包是只读的,所以 kotlin 选择了使用对象存放上下文。因此在每个 suspend(kotlin 里对 async 的叫法)函数中,都会生成一个 continuation 对象,它的 label 属性存放恢复点,result 属性存放调用结果,其他属性存放 this 对象,参数和局部变量。在调用异步方法时,额外传入 continuation,如果需要挂起,则结束当前函数,需要恢复时则调用 continuation 的 invokeSuspend 方法,它会用更新过的 continuation 调用原方法,原方法的局部变量从 continuation 中恢复; 否则把 label 值加上 1,进入下一个状态。这一过程正是大名鼎鼎的 CPS 变换

那么 TypeScript 是怎么做的呢?

1. 从 Promise 开始

和 kotlin 那样白手起家不同,JS 从 ES6 开始就有了异步的基础设施,具体说来则是 Promise 和 Generator,JS 的 async 函数是建立在前两者的基础上的。不妨考察如下程序

ts
async function foo(s: string) {
const url = 'sfsdfsdf'
try {
const a = await fetch(s)
const b = await fetch(url)
const c = await fetch('dasdsa')
return [a, b, c]
} catch {
return 'error'
}
}

把编译目标设置为 ES2015,并对编译结果美化,可以得到如下的代码

js
function __awaiter(thisArg, _arguments = [], P = Promise, generator) {
return new P(function (resolve, reject) {
function fulfilled(value) {
try {
step(generator.next(value))
} catch (e) {
reject(e)
}
}
function rejected(value) {
try {
step(generator.throw(value))
} catch (e) {
reject(e)
}
}
function step(result) {
result.done ? resolve(result.value) : P.resolve(result.value).then(fulfilled, rejected)
}
step((generator = generator.apply(thisArg, _arguments)).next())
})
}
function foo(s) {
return __awaiter(this, undefined, undefined, function* () {
const url = 'sfsdfsdf'
try {
const a = yield fetch(s)
const b = yield fetch(url)
const c = yield fetch('dasdsa')
return [a, b, c]
} catch (_a) {
return 'error'
}
})
}

可以看到,原来的 async 函数被编译为了对__awaiter的调用,函数体则几乎不变地被当作第四个参数传进去,只不过从 async 函数变成了生成器函数,而 async 则变成了 yield。

__awaiter接受四个参数,分别是 this 对象,foo 函数的 arguments(虽然这是不推荐的行为,但还是有人会直接访问它),Promise 实现(可能有人会想用不同于原生 Promise 的某个实现,比如bluebird),以及生成器函数. 那么__awaiter具体做了什么呢?

首先,一个 async 函数应当返回一个 Promise,所以先要满足这个要求

js
return new P(function (resolve, reject) {
...
})

其次,在这个 Promise 的构造函数内,使用正确的 this 和 arguemnts 调用生成器函数,获得迭代器,并把第一个值传入step函数

js
generator = generator.apply(thisArg, _arguments)
step(generator.next())

step函数用来处理迭代器返回的每个值。先判断当前的迭代器是否完成,完成的话直接resolve它的值即可,async 函数返回的 Promise 此时进入fulfilled状态

js
function step(result) {
result.done ? resolve(result.value) : P.resolve(result.value).then(fulfilled, rejected)
}

如果没完结的话,则把fulfilled以及rejected挂在迭代器返回的 Promise 上。注意迭代器返回的不一定是 Promise,需要Promise.resolve对其进行包装

fulfilled和rejected都用于处理返回的 Promise 其中的值,这里只看其中一个

js
function fulfilled(value) {
try {
step(generator.next(value))
} catch (e) {
reject(e)
}
}

可以看到当返回的 Promise 进入fulfilled状态时,fulfilled会把这个值传给迭代器以获取下一迭代,并把返回值再传入step中. 如果这一过程中迭代器抛出错误,则使 async 函数返回的 Promise 进入rejected状态.

结论上说来,JavaScript 中的 async 函数的核心即是返回 Promise 的迭代器。迭代器每返回一个 Promise,调用者都会等待它完成,之后把得到的值再传回迭代器,并获取下一个 Promise。当迭代器完成或者报错时,async 函数返回的 Promise 即可进入对应状态.

这是典型的用受限协程实现第一类协程的方法,尽管受限协程只能把执行权让给调用者,然而调用者可以把返回的东西转交给调度器,这样就能实现把执行权让给任意协程。

2. 到 Generator 结束

读到这里可能有人发现,第零节和第一节似乎并没有什么关系,闭包和 switch...case 都没有出现,这是因为 JavaScript 的生成器掩盖了这一复杂性。把前文里的生成器函数继续编译下去,则能得到如下代码. 这段代码即使在美化之后也非常复杂,不过可以在 TSC 里看到一些说明,我把它写进了注释中

js
enum Instruction {
Next, // 开始或是用 next 恢复生成器
Throw, // 用 throw 恢复生成器
Return, // 用 return 恢复生成器
Break, // 跳转到指定的 label
Yield, // 把执行权让给调用者并传一个值
YieldStar, // 把执行权让给另一个迭代器
Catch, // 处理生成器内部抛出的例外
Endfinally // 恢复进入 finally 块之前的最后一个指令
}
// thisArg存储生成器的 this 指向,body 则是要处理的生成器函数
function __generator(thisArg, body) {
let continuation = {
label: 0, // 就是前面提到的 label
sent() {
if (t[0] % 2 === 1) throw t[1]
return t[1]
}, // 把得到的值送给body
trys: [],
// try 语句块的位置,是一个四元组的数组,分别是 try 的起始和结束 label,
// catch 块的起始 label,finally 块的起始 label
ops: []
// 操作
}
let f, // 指示生成器正在运行
y, // 为yield*准备
t // 多功能的临时变量
return {
next: verb(Instruction.Next),
throw: verb(Instruction.Throw),
return: verb(Instruction.Return),
[Symbol.iterator]() {
return this
}
}
// 工具函数
function verb(n) {
return function (v) {
return step([n, v])
}
}
// 处理迭代器
function step(op) {
if (f) throw new TypeError('Generator is already executing.')
while (continuation) {
// 实现细节略去
}
return { value: op[0] ? op[1] : undefined, done: true }
}
}
function foo(s) {
let url, a, b, c, err
return __generator(this, function (continuation) {
switch (continuation.label) {
case 0:
url = 'sfsdfsdf'
continuation.label = 1
case 1:
continuation.trys.push([1, 5, , 6])
return [Instruction.Yield, fetch(s)]
case 2:
a = continuation.sent()
return [Instruction.Yield, fetch(url)]
case 3:
b = continuation.sent()
return [Instruction.Yield, fetch('dasdsa')]
case 4:
c = continuation.sent()
return [Instruction.Return, [a, b, c]]
case 5:
err = continuation.sent()
return [Instruction.Return, 'error']
case 6:
return [Instruction.Return]
}
})
}

可以看到与 kotlin 的实现不同,局部变量使用闭包保存,只有需要传递的状态才被放在continuation对象里,因此局部变量只需初始化一次,而非每次都要从continuation中还原。每个yieldreturn关键字都会生成一个对应的case分支,最后还有一个额外的默认返回分支.


  1. Moura, Ana Lúcia De, and Roberto Ierusalimschy. "Revisiting coroutines." ACM Transactions on Programming Languages and Systems (TOPLAS) 31.2 (2009): 6.
© 2016 - 2024Austaras Devas