RxJS速查表

发表于更新于阅读时长 11 分钟

基于 RxJS 6+, 文中各函数的参数按顺序称为 a, b, c, d...

0. 前言

虽然能读到这篇文章, 通常说明你可能并不需要对 RxJS 的介绍, 但我决定无论如何还是尝试一下.

RxJS 能用来做什么? 即使你对 RxJS 和相关的知识没有什么了解, 也并不妨碍你把它当作高级一些的 EventEmitter, 又或者是能多次 Resolve 的 Promise, 我相信在 Angular 生态圈中大量的人就是这样做的. 而更偏向理论的介绍则把 RxJS 比喻成作用在异步数据上的 lodash

我很喜欢这种说法, 但它引入了不必要的喻体(lodash). 更一般地, 在 RxJS 的视角中, 我们可以将异步数据(前端天生是异步的)看作以出现次序为下标的数组, 并且对这个数组(虽然它通常不存在代码中而只存在于时间轴上)实行平时我们对数组进行的操作. (更准确的说法是数组和 Observable 都是Functor)这大大简化了我们在程序开发中使用事件驱动程序设计的代价. 事件驱动能为前端开发带来非常多的好处, 其中最重要的是减少了前端最常见的问题之一——失去同步(状态与视图之间,组件与组件之间)——的出现. 更进一步, 它可以减轻复杂程序中最大的负担, Global Mutable State, 使得代码中的数据流变得更加清晰, 提高了可维护性.

如果你觉得上面的说明太空洞(它确实是), 那么我想推荐一段简短又能展现 RxJS 醍醐味的代码, 我把它抄到了我的Github

1. 概念和创建类操作符

  • 冷热
    冷的 Observable 在每次被订阅时创建新的数据生产者, 而热的 Observable 在被订阅时使用已有的生产者. RxJS 中的绝大多数 Observable 都是冷的
  • chainable 和 pipeable
    前者即ob.map(fn1).reduce(fn2), 后者即ob.pipe(map(fn1), reduce(fn2)), RxJS 6 中为了减小包体积, 方便 tree shaking 的缘故选择了后者(前者因为修改了 Observable class 而需要深链接)
  • 状态
    除非显示的调用了 complete 或 error, 一个 Observable 即为未完结状态
  • 数据来源
    如果有 source, 则来自于 source(一般出现在传入操作符的时候); 没有则来源于传入的 subscribe 函数
  • new 和 Observable.create
    直接调用 Observable 的构造函数,参数为Subscriber => function | { unsubscribe: function }, Subscriber 即{ next, error, complete }. 返回值在取消订阅时调用
  • lift
    传入操作符, 产生新的 Observable, 主要用于实现操作符
  • of
    产生按参数顺序发射的冷的 Observable
  • range
    产生一个流, 它的值从 a 开始, 每次增加 1, 直到大于等于 b 为止
  • generate
    接受四个参数, 等价于for (a();b();c()) { d() }
  • repeat
    当上游的流完成时取消再重新订阅, 重复这个过程直到次数超过 a
  • repeatWhen
    接受一参数Subject => Observable, a 的上游完结时 a 的参数 Subject 就发射一个值; 每当 a 返回的流发射一个值的时候就取消再重新订阅一次上游
  • empty, throw 和 never
    立刻发射完成, 立刻发射错误, 什么值都不发射
  • interval
    从 a ms 开始, 每 a ms 发射一个从 0 开始递增的值
  • timer
    从 a ms 开始, 如果 b 为空, 产生 0 ; 不为空, 每 b ms 发射一个递增的值
  • from
    如果 a 为 可迭代(Iterable)值, 则等价于 for ... of; 如果 a 为 Promise, 则当其 resolve 或 reject 时发射对应的值
  • fromEvent
    等价于a.addEventListener(b)
  • fromEventPattern
    订阅时调用 a, 取消订阅时调用 b, a 和 b 的类型均为(handler: function) => void, handler 为上游发射值时的回调
  • ajax 和 websocket
    把 ajax/websocket 事件转换成热的流
  • defer
    接受一参数() => Observable, 直到被订阅时才去调用 a. 用于 lazy 的(在被订阅时)才创建 Observable. 常用于把热的流转换成冷的

2. Subject

  • Subject
    同时是 Observer (即可以被传进subscribe中, 或者说有next, error, complete)和 Observable . 常用于将冷的 Observable 转换成热的. 在 Angular 中也用于实现 EventEmitter. 提供 asObservable 以防将 next 等方法暴露给外界
  • 状态
    直到调用 error 或 complete 时 subject 都处于为未完结状态. 进入完结或错误状态后再订阅 Subject 会立刻收到对应的完结或错误信号
  • 多个上游和多个下游
    若一个 Subject 订阅了多个上游, 则在任意一个上游完结时完结, 应当使用 merge 而尽量避免这种情况; 若一个 Subject 被多个下游订阅, 在任意下游报错且未捕捉时, 所有下游都会停止收到数据
  • AsyncSubject
    在上游完成或抛错之后发射源最后一个值或错误, 并且缓存该值. 如果在源完成之后订阅该 AsyncSubject, 它会立刻收到这个值
  • BehaviorSubject
    被订阅时立刻发射上游此时发射过的最后一个值; 如果此时上游未发射过值, 则发射种子值 a. 可以通过 getValue 同步地获取这个值
  • ReplaySubject
    被订阅时发射上游此时在最近 b 时间内发射过的最后 a 个值. 如果不传入 a, b 则发射上游的全部值. 可以把ReplaySubject(1)当作无始值的 BehaviorSubject 来用

3. 过滤类操作符

  • filter
    接受一参数(value: T, index: number) => boolean, 返回结果为 true 时转发上游的值, 否则抛弃它
  • first 和 last
    不传入参数时, 发射第一个/最后一个值并完结. 传入一至三个参数时, 发射第一个/最后一个满足 a 的值, 并传入 b, 发射 b 的返回值, 之后完结; 若直到上游完结时都没有值满足 a, 则发射一 EmptyError 或传入的 c
  • take
    转发上游的数据直到数量达到 a 为止, 并发射完结; 如果上游数据不足 a 个, 则在上游完结时完结
  • takeLast
    上游完结时发射最后 a 个数据
  • takeWhile
    接受一参数(value: T, index: number) => boolean, 转发上游数据直到 a 的返回值为 false(这个值也不会被转发)
  • takeUntil
    接受一个 Observable 为参数, 当 a 发射值, 发射错误或完结时停止转发上游的值
  • skip, skipWhile 和 SkipUntil
    和对应的 take 操作符结构类似, 语义相反, 忽视而不是只转发前若干个
  • throttle, debounce, audit
    接受一个参数value => Observable
  • throttle
    转发上游的第一个数据并调用入参函数, 直到返回的 Observable 发射第一个值之前忽略上游的其他所有值; 重复前述过程. "节流"
  • debounce
    上游发射数据时, 以之为参数调用入参函数, 如果返回的 Observable 发射第一个值时之前没有收到新值, 则发射这个值; 否则重复前述过程. "去抖动"
  • audit
    用上游的第一个数据调用入参函数, 在返回的 Observable 发射第一个值时转发此期间上游发射的最后一个值; 重复前述过程
  • sample
    接受一个 Observable 为参数, 该 Observable 发射值时转发上游发射的最后一个值. "采样"
  • throttleTime, debounceTime, auditTime 和 sampleTime
    operatorTime(a: number)等价于operator(() => timer(a)) sample(a: number)等价于sampleTime(interval(a))
  • distinct
    只在数据第一次出现时转发给下游. 接受两个参数value => T, Observable<any>, 用 a 的调用结果作为比较基准, 当 b 发射值时"忘记"所有已出现数据
  • distinctUntilChanged
    接受一参数(a, b) => boolean, 当前值和前一个值不同或它们使得比较函数返回 false 时发射当前值
  • distinctUntilKeyChanged
    接受一参数key: string, 等价于distinctUntilChanged((a, b) => a[key] === b[key])
  • ignoreElement
    忽略所有元素, 只转发完成和错误
  • elementAt
    只转发第 a+1 个数据, 如果在此前上游完结, 则发出 b
  • single
    如果上游只有一个值使a(value)为 true, 则转发这个值; 否则发射一个错误

4. 合并类操作符

  • concat
    保存所有传入的流, 从前到后订阅. 前一个流完结以后, 订阅下一个流
  • merge
    同时订阅所有流. 所有流都完结以后才会完结. 可以指定同时接受值的流的个数
  • zip
    同时订阅所有流, 缓存所有流发射的值, 直到每个流各发射一个值之后打包发射. 当某个流发射的所有值都被 zip 发射之后完结
  • combineLatest
    同时订阅所有流, 每个流发射值的时候和其他流的最近一个值打包再发射. 第一个值在所有流都发射值之后发射, 所有上游完结之后再完结. 由于 js 的单线程特质, 当作用于多个有相同上游的流时会有问题, 会在该上游发射值的时候发射多个值
  • withLatestFrom
    同时订阅所有流, 控制流发射值的时候和辅助流的最近一个值打包再发射. 在所有辅助流都发射值之后, 控制流发射第一个值时发射第一个值, 在控制流完结时就完结. 没啥用
  • concatAll, mergeAll, zipAll 和 combineAll
    和同名操作符功能类似, 只不过是作用于高阶流, 对其内部流施加变化, 即 Observable<Observable<T>> => Observable<T'>. concatAll 可能造成数据积压. zipAll 和 combineAll 只会在高阶流完成之后才发射值.
  • switch
    作用于高阶流. 每当高阶流产生新流时, 取消当前流的订阅并转去订阅新流. 高阶流和当前订阅的流都完结时完结
  • exhaust
    作用于高阶流. 如果当前流尚未完结, 而高阶流发射了新的流, 则忽略掉新流.
  • race
    订阅多个流中第一个发射值的流. 类似于Promise.race
  • forkJoin
    在所有流都完结后打包发射它们的最后一个值. 类似于Promise.all
  • startWith
    在流的开始给流发射一个或多个值
  • pairwise
    发射当前值和上一个值组合成的数组. 跳过第一个值

5. 转化类操作符

  • map
    接受上游发射的值, 向下游发射a(value, index). 可用 b 指定 a 中的 this
  • mapTo
    mapTo(a: T)等价于map(() => a)
  • pluck
    pluck(...a: string[])等价于map(value => value[a[0]][a[1]]...). 用于取特定 key 对应的值. 访问深层值不会报错. 目前类型是坏的
  • window
    接受一个 Observable 作为参数, 产生一新流转发上游发射的值, 直到控制流发射值; 之后重复此过程. 若控制流完结则该流也完结
  • windowTime
    每 b ms(若没传入 b 则为每 a ms)产生一个新的流, 从此刻开始 a ms 内此内部流会转发上游发出的数据
  • windowCount
    每 b 个数据(若没传入 b 则为每 a 个)产生一个新的流, 从该数据开始转发 a 个数据
  • windowWhen
    接受一参数() => Observable, 调用 a, 产生一个转发上游数据的流, 此流在 a 返回的流发射值时完结; 此后重复此过程
  • windowToggle
    接受两参数open$: Observable, close: value => Observable, 每当 open$ 发射一个数据时, 产生一个新的流转发上游的值, 同时用这个值调用 close, 当 close 返回的流发射值时使前述内部流完结
  • buffer, bufferTime, bufferCount, bufferWhen 和 bufferToggle
    和对应的 window 操作符类似, 只不过不产生新流, 而是在缓存结束时把此期间上游产生的数据打包成数组一并发射; 或者说, bufferOp等价于对mergeMapTo(source$.windowOp.reduce)
  • bufferTime
    每 b ms(若没传入 b 则为每 a ms)缓存从此刻开始 a ms 内上游发出的数据, 打包成数组发给下游. 若传入 c, 则每组缓存数据的个数不超过 c 个
  • concatMap, mergeMap, switchMap 和 exhaustMap
    接受一参数(value: T, index: number) => Observable<R>, 使得Observable<T>变成Observable<R>. 分别等价与对一个 Observable 调用 map 之后再调用 concatAll/mergeAll/switch/exhaust 接受一可选参数selector: (outValue, inValue, outIndex, inIndex) => R, 内部流发射值时调用. 四个参数分别是上游发射的值, 内部流发射的值, 上游值的次序, 内部流值的次序, 发射返回值给下游
  • mergeMap
    又叫 flatMap. 和 map 一起可以当作promise.then 来使用
  • switchMap
    常用来处理网络请求
  • concatMapTo, mergeMapTo 和 swicthMapTo
    operatorTo(a$: Observable)等价于operator(() => a$)
  • groupBy
    产生高阶流. 用上游发射的值调用 a, 按照返回结果分类; 如果返回结果之前没有出现过, 则产生一个新流, 否则把上游的值转发给对应的内部流
  • partition
    目前类型是坏的. 产生两个流, 分别转发使得 a 返回值为真/假的数据
  • scan
    接受一参数(acc: T', val: T, ind: number) => T', 上游每发射一个值, 就调用 a, 向下游发射返回值并把返回值作为下一次调用的 acc. 如果传入 b, 则 acc 的初始值为 b, 否则直接转发第一个值而不调用 a . 用于存储状态, 是各种基于 RxJS 的状态管理库的核心操作符
  • mergeScan
    接受两参数(acc: T', val: T, ind: number) => Observable<T'>, seed: T', seed 为 acc 的初始值, 上游发射数据时产生一个流, 和之前产生的流进行 mergeAll(因此产生的不是高阶流), acc 记录所有流中最迟发出的值. 没啥用

6. 多播类操作符

  • multicast
    接受一参数Subject | () => Subject, 返回一 ConnectableObservable. 若传入的是函数, 则在开始时或上游每次完结之后调用这个函数, 用其返回的 Subject 去订阅上游 若传入第二个参数Observable<T> => Obervable<R>, 则每次被下游订阅时用第一个参数(或其产生的 Subject)调用它, 让下游去订阅返回的流. 在下游对上游数据有多重依赖的时消除对上游的多次订阅
  • ConnectableObservable
    Observable 的子类, 当调用 connect 方法时, 它会调用内部的 Subject 去订阅上游.
  • pulish
    等价于multicast(new Subject())
  • pulishLast
    等价于multicast(new AsyncSubject())
  • publishReplay
    等价于multicast(new ReplaySubject())
  • publishBehavior
    等价于multicast(new BehaviorSubject())
  • share
    等价于multicat(() => new Subject()).refCount()
  • shareReplay
    等价于multicat(() => new ReplaySubject()).refCount(), 常用于缓存网络请求
  • refCount
    当订阅数0 -> 1时, 调用 connect; 订阅数1 -> 0时, 取消订阅. 不要接在publish后面, 否则在第一次完结之后订阅的都收不到值

7. 数学类和条件类操作符

  • count
    上游完结后, 发射上游发射的所有数据的个数
  • sum 和 average
    上游完结后, 发射上游发射的所有数据的和/平均值
  • max 和 min
    上游完结后, 发射上游发射的所有数据中最大/最小的那个值, 接受一个函数(a, b) => number作为比较函数
  • reduce
    上游完结后, 把上游发射的所有数据作为一个数组, 对其进行 reduce 操作, 发射结果
  • every, find 和 findIndex
    均接受一个函数作为判断函数, 其签名为(value, index, source$) => boolean, 且在发射值后立即完结.
  • every
    若上游发射的某个值使得判断函数的返回值为 false, 则立即发射 false; 否则在上游完结时发射 true.
  • find 和 findIndex
    当上游发射的某个值使得判断函数的返回值为 true, 则立即发射该数据/该数据的 index; 否则在上游完结时发射 undefined/-1
  • isEmpty
    在上游发射第一个值时发射 true; 如果直到完结时上游都没有发射值, 发射 false
  • defaultIsEmpty
    在上游发射值时转发这个值; 如果直到完结时上游都没有发射值, 发射传入的值, 若没有传入值则发射 null

8. 错误处理类和辅助工具类操作符

  • catchError
    捕捉上游发来的错误, 产生一个新流, 使下游转而去订阅这个流
  • onErrorResumeNext
    类似于 catchError, 只不过是在上游发射错误或完结时调用
  • retry
    当上游发射错误时, 取消再重新订阅上游, 直到重试次数达到 a 为止
  • retryWhen
    接受一参数err$: Observable<Error> => Observable, err$ 流会转发上游所有错误, 当 err$ 第一次发射值时, 调用 a, 每当 a 发射值时, 取消再重新订阅上游. 可以以此为基础完成各种复杂的重试运算符
  • finalize
    在上游发射错误或完结时调用 a
  • tap
    不影响流, 只会产生副作用. 常用于调试和用 Subject 向上游传递数据
  • delay
    若 a 为数字, 则在上游发射值后 a ms 转发该值给下游; 若为Date, 则延迟第一个值转发的时间到该时间点
  • delayWhen
    接收一参数(val: T, ind: number) => Observable, 当上游发射值时用该值调用 a, 当 a 返回的流发射值时转发上游值
  • materialize
    用于把值, 完成信号, 错误信号统一包装成 Notification, 以免对下游产生副作用
  • dematerialize
    用于倒转 materialize 的作用
  • timestamp
    把上游传来的数据包装成{ value, timestamp }, 其中 timestamp 为当前时间
  • observeOn
    传入一个 Scheduler, 由它来控制向下游发射数据的时机
  • subscribeOn
    传入一个 Scheduler, 由它来控制下游订阅源的时机

9. Scheduler

  • Scheduler
    用于决定数据在何时向下游推送.
  • 使用 Scheduler
    可以使用scheduler.schedule来调度函数, 或者传入特定操作符作为参数 可使用 scheduler 的操作符包括创建类操作符以及 concat 和 merge
  • asapScheduler
    使用Promise.resolve来延迟数据的推送
  • asyncScheduler
    使用setInterval来延迟数据的推送
  • animationFrameScheduler
    使用requestAnimationFrame来延迟数据的推送
  • queueScheduler
    用在操作符中或调用schedule方法时第二个参数不传入或等于 0 时同步执行, 否则等同于 asyncScheduler
© 2016 - 2023Austaras Devas