本文将介绍 Rx 中的 Subject、Observer、Observerable 等单子在生产中的可能用法,而不介绍amb 算子这样的态射,因为介绍态射的文章明显更多。

为方便了解,我提供了一些特性示例,放入一个 js 文件即可执行。在执行之前需要在文件夹里 npm i rxjs bluebird babel-core babel-polyfill babel-preset-stage-0,然后加一个 .babelrc 内书:

{
  "presets": ["stage-0"]
}

然后以 node -r "./node_modules/babel-core/register" -r "./node_modules/babel-polyfill/lib/index.js" 运行。

这么一想,新手想尝试性运行现代 JS 居然还蛮麻烦的嘛?

单子

单子指的是 Promise、Array、Observerable 这样的容器,可以把 number、string、Object 这样的类型提升到新的范畴里。在 TypeScript 里我们所写的尖括号就描述了这一提升:


const aaa: Promise<number>;
const bbb: Array<string>;
const ccc: Observerable<Request>;

态射指的是 map、reduce、filter 这样的函数。我们知道 aaa.map(item => item.someProp) 返回的还是一个 Promise,只是里面装的东西可能不一样了(可能装着一个 error),同样地 ccc.map(item => item.someProp) 返回的还是一个 Observerable,这种返回值的一致性让我们可以组合使用函数,特别是组合使用 lodash 提供的那些函数。

RxJS 说它是 Observerable 的 lodash,指的就是它提供了各种各样的态射,对 Observerable 使用这些函数依然返回 Observerable。

AsyncSubject 「Resolve 多个值的 Promise」

众所周知,Promise 会在 resolve 后将内部的值吐出来,它可以吐出一个值,比如一个 number、一个 string、一个 Object。虽然通过 Object 我们可以一次性解构出好多个值,但这时候 Object 内的几个值是同步地吐出来的,也就是说这个 Promise 后面的 then 只会被调用一次。

AsyncSubject 也差不多,你看:

// 特性示例
import { AsyncSubject } from 'rxjs';

const subject = new AsyncSubject();

// 这是把三个值装进了容器里
subject.next(0);
subject.next(1);
subject.next(2);
// complete 就像我们从 Promise 的里面控制它 resolve 了一样
subject.complete();
// subscribe 就像我们对 Promise 使用 .then 一样
subject.subscribe(value => console.log(value));
// 可以拿到最后一个「resolve」的值:
// 2

Observable & Observer

Observable 乃可观察对象,用 create 函数创建。它是一个流,类似于可以异步地 resolve 多个值的 Promise。向流里传值的工作用创建时传给你的 observer 对象来做。

Observer 乃对可观察对象的观察者,可以是一个很简单的、只包含三个键值对的对象,把它传给 subscribe 函数后可以用它观察到 Observable 流中的值。

// 特性示例
import Promise from 'bluebird';
import { Observable } from 'rxjs';

const observable = Observable.create(async observer => {
  // 向流里传值的工作用传给你的 observer 对象来做
  // 此处我们同步地传两个值
  observer.next(1);
  observer.next(2);
  // 再异步地传一个值
  await Promise.delay(1000);
  observer.next(3);
  observer.complete('complete!');
  // Error 与 complete 都类似于终结符(EOF、终止密码子),用了 complete 就不能再接收到 observer.error('error!');
  observer.error('error!');
  // 此处可以 return 一个函数,它会在 Observer unsubscribe 的时候被调用
  // return () => console.log('unsubscribed!');
});

console.log('before1');
// 订阅以得到流中的值,自动创建 Observer
// 注意这边第一个函数其实就是上面 Observable.create(async (observer) => { 中的 ovserver
observable.subscribe(
  nextValue => console.log(`First function get a value:  ${nextValue}`),
  error => console.error(`Second function get an error: ${error}`),
  notification => console.log(`Observer got a complete notification, but undefined is passed in: ${notification}`)
);
console.log('after1');
// 手动创建一个 Observer,一个只包含三个键值对的对象,并用它来订阅流
console.log('before2');
observable.subscribe({
  // 注意这边这个 next 其实就是上面 Observable.create(async (observer) => { 中的 ovserver.next
  next: nextValue => console.log(`First function get a value:  ${nextValue}`),
  error: error => console.error(`Second function get an error: ${error}`),
  complete: notification =>
    console.log(`Observer got a complete notification, but undefined is passed in: ${notification}`),
});
console.log('after2');
// before1
// First function get a value:  1
// First function get a value:  2
// after1

// before2
// First function get a value:  1
// First function get a value:  2
// after2

// First function get a value:  3
// Observer got a complete notification, but undefined is passed in: undefined
// First function get a value:  3
// Observer got a complete notification, but undefined is passed in: undefined

Observable 有冷热的概念

Cold Observable 就是在 create 等创建函数里搞一个数据源,这样每个 Observer 都会有自己的专属数据源:

const source = new Observable(observer => {
  const socket = new WebSocket('ws://someurl'); // 专属数据源
  socket.addEventListener(
    'message',
    e => observer.next(e) // 冷热 Observable 在这是一样的
  );
  return () => socket.close();
});

Cold Observable 每多用一次就会多用一个 socket 连接,需要更多计算资源,但你可以 return () => socket.close()来单独在 unsubscribe 的时候关闭 socket。

Hot Observable 则是吃热的大锅饭,让每个 Observer 用同一个数据源:

const socket = new WebSocket('ws://someurl'); // 大锅饭数据源
const source = new Observable(observer => {
  socket.addEventListener(
    'message',
    e => observer.next(e) // 冷热 Observable 在这是一样的
  );
});

这种共用节省了资源,但是你不方便在 unsubscribe 的时候关闭 socket,因为你不知道还有没有 Observer 在用这个 socket 呢。可以用后面说的 ConnectableObservable 来折衷冷热。

服务端

用 Express 将传入的 HTTP request 转化为流:

// 服务端用法
import { Observable } from 'rxjs';

const observable = Observable.create(observer => {
  expressApp.get('/api/v1/xxxxxx/:id', (req, res) => {
    try {
      const id: number = req.params.id;
      const { limit }: NotificationQuery = req.query;
      // 搞个流出来
      observer.next({ id, limit });
      res.send({ code: 0, data: result });
    } catch (error) {
      observer.error('error!');
      res.send({ code: -1, message: error.toString() });
    }
  });
});

Subject

Subject 是一种中间节点,可以观察别的 Observerable,可以被别的 Observer 观察。

subject.subscribe(某个Observer) 就可以被某个 Observer 视奸,用 subject.next(1) 就可以向视奸它的 Observer 们暴露一根 1 。可攻可受,煞是方便。

// 特性示例
import { Subject } from 'rxjs';

const subject = new Subject();

console.log('before1');
// 先订阅这个流
subject.subscribe(
  nextValue => console.log(`First function1 get a value:  ${nextValue}`),
  error => console.error(`Second function1 get an error: ${error}`),
  notification => console.log(`Observer1 got a complete notification, but undefined is passed in: ${notification}`)
);
console.log('after1');
// 然后可以往流里塞值
subject.next(1);
// 可以将 next 函数传作它用,但一定要记得 bind this,不然会爆 Cannot read property 'closed' of undefined
const putValue = subject.next;
subject::putValue(2);

subject.complete('asdf');
subject.error('error1');
subject.complete('asdf');
subject.error('error2');

// Subject 默认不会缓存之前发送的值,所以来晚的 Observer 就啥也看不到了
// 但来晚的 Observer 会收到离自己最近的终结符,也就是收到 error 或 complete,此处是 error2,而且似乎 error 有比 complete 更高的优先级
console.log('before2');
subject.subscribe({
  next: nextValue => console.log(`First function2 get a value:  ${nextValue}`),
  error: error => console.error(`Second function2 get an error: ${error}`),
  complete: notification =>
    console.log(`Observer2 got a complete notification, but undefined is passed in: ${notification}`),
});
console.log('after2');
// before1
// after1
// First function1 get a value:  1
// First function1 get a value:  2
// Observer1 got a complete notification, but undefined is passed in: undefined
// before2
// Second function2 get an error: error2
// after2

这种只把在订阅之后的数据发射给观察者,而没有记忆性的 Subject ,在 RxJava 里叫 PublishSubject。

BehaviorSubject 有态度的主题

BehaviorSubject 会有态度地为你预留一个值,任何来晚的 Observer 都可以拿到至少一个值。

// 特性示例
import { BehaviorSubject } from 'rxjs';

// 因为 BehaviorSubject 的特性是预留一个值,所以你可以在初始化时传一个值
const subject = new BehaviorSubject(0);

console.log('before1');
// 先订阅这个流
subject.subscribe({
  next: nextValue => console.log(`First function1 get a value:  ${nextValue}`),
  complete: notification => console.log('Observer1 got a complete notification'),
});
console.log('after1');

subject.next(1);
// 关于 complete 和 error 的特性和 Subject 一样,但是这些终结符会挤掉之前预留的值
// 所以下面第二个 Observer 拿不到 next(1) 传入的 1
subject.complete(2);

console.log('before2');
subject.subscribe({
  next: nextValue => console.log(`First function2 get a value:  ${nextValue}`),
  complete: notification => console.log('Observer2 got a complete notification'),
});
console.log('after2');
// before1
// First function1 get a value:  0
// after1
// First function1 get a value:  1
// Observer1 got a complete notification
// before2
// Observer2 got a complete notification
// after2

ReplaySubject

Replay 顾名思义就是重放它曾经拥有的所有值。

注意这时传给它的第一个参数是「记忆数量」,设为 6 就表示只记忆 6 个值,和人类差不多。不传这个值似乎就可以无限记忆了。传 0 和传 1 都是只记住一个,和 BehaviorSubject 效果相同。

第二个参数是「记忆时间」,传 0 表示记忆时间为 0 ms……

// 特性示例
import { ReplaySubject } from 'rxjs';

const subject = new ReplaySubject(0, 0);

subject.next(0);
subject.next(1);
subject.next(2);

subject.subscribe({
  next: nextValue => console.log(`First function get a value:  ${nextValue}`),
  complete: notification => console.log('Observer got a complete notification'),
});

subject.next(3);
// 记忆力特别差,只能记住订阅前 0 ms 内的值,且只能记住一个
// First function get a value:  2
// First function get a value:  3

ConnectableObservable 多路推送中转节点

有时我们想让一个流变成多个一样的流,用 Cypher 来写就是 (newStream1)<-[:Clone]-(stream)-[:Clone]->(newStream2),有至少三种直观的方法:

前两种使用了 ConnectableObservable。它可以作为中转节点,让多个观察者同时收到数据源推送的消息,类似于 addEventListener,

observable.multicast(subject)

这种方法主要的好处是 subject 也可以用 next 发送值。

// 特性示例
import { Subject, Observable } from 'rxjs';

// 先创建一个可观察对象,作为数据源
const observable = Observable.from([1, 2, 3]);

// 然后我们用一个 Subject 来作为中转节点,此时不用 subscribe,而是用 multicast
const subject = new Subject();
const multicasted = observable.multicast(subject);

multicasted.next(123);

// 然后中转节点可以被多个节点关注
multicasted.subscribe({
  next: value => console.log(`observerA: ${value}`),
});
multicasted.subscribe({
  next: value => console.log(`observerB: ${value}`),
});

// multicast - connect 是一对
multicasted.connect();
// 在 connect 之前 subscribe 到 subject 的 Observer 可以拿到所有的值
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

observable.publish().subscribe(observer)

这种方法主要的好处是书写简洁,直接订阅了事。

// 特性示例
import { Observable } from 'rxjs';

// 先创建一个可观察对象,作为数据源
const observable = Observable.from([1, 2, 3]);

const published = observable.publish();

published.subscribe(value => console.log(`observerA: ${value}`));
published.subscribe(value => console.log(`observerB: ${value}`));

const connection = published.connect();
// 在 connect 之前 subscribe 到 observable 的 Observer 可以拿到所有的值
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

observable.map(i => i)

这种用法的特点是流不共享执行环境。

// 特性示例
import { Observable } from 'rxjs';

// 先创建一个可观察对象,作为数据源
const observable = Observable.from([1, 2, 3]);

const stream1 = observable.map(i => i);
const stream2 = observable.map(i => i);

stream1.subscribe(value => console.log(`observerA: ${value}`));
stream2.subscribe(value => console.log(`observerB: ${value}`));

observable.subscribe();
// 在 connect 之前 subscribe 到 observable 的 Observer 可以拿到所有的值
// observerA: 1
// observerA: 2
// observerA: 3
// observerB: 1
// observerB: 2
// observerB: 3

参考

RxJS 核心概念之 Subject

RxJS Overview

Hot vs Cold Observables

简单介绍函数式编程中的 Functor(函子),Applicative(加强版函子),Monad(单子)