Home Manual Reference Source Test Repository

es6/Observable.js

import { root } from './util/root';
import { toSubscriber } from './util/toSubscriber';
import { observable as Symbol_observable } from './symbol/observable';
/**
 * 表示在任意时间内的任意一组值。 这是 RxJS 最基本的构建块。
 *
 * @class Observable<T>
 */
export class Observable {
    /**
     * @constructor
     * @param {Function} subscribe 当 Observable 初始订阅的时候会调用该方法. 该函数接受 Subscriber, 这样就可以 `next`
     * 值,或者 `error` 方法会被调用以引发错误,或者 `complete` 被调用以通知成功的完成。
     */
    constructor(subscribe) {
        this._isScalar = false;
        if (subscribe) {
            this._subscribe = subscribe;
        }
    }
    /**
     * 创建一个新的 Observable,以它作为源,并传递操作符的定义作为新的 observable 操作符。
     * @method lift
     * @param {Operator} operator 定义了如何操作 observable 的操作符。
     * @return {Observable} 应用了操作符的新 observable。
     */
    lift(operator) {
        const observable = new Observable();
        observable.source = this;
        observable.operator = operator;
        return observable;
    }
    /**
     * 调用 Observable 的执行并注册 Observer 的处理器以便于发出通知。
     *
     * <span class="informal">当你拥有这些 Observables 却仍然什么也没发生时使用它。</span>
     *
     * `subscribe` 不是一个常规的操作符,而是方法,它调用 Observables 内部的 subscribe 函数。它也许是一个你传递给 {@link create}
     * 静态工厂的方法,但是大多数情况下它是一个库的实现,它定义了 Observable 什么时候发出,发出什么。这意味着实际上是 Observable 开始工
     * 作的那一刻才调用 subscribe , 而不是像人们经常认为的那样,即创建 Observable 的时候。
     *
     * 除了开始 Observable 的执行,该方法允许你监听 Observable 发出的值,也包括完成或者发生错误。你可以通过以下两种方式达到
     * 这种目的。
     *
     * 第一种方式是创建一个实现了 {@link Observer} 接口的对象。它应该实现接口定义的方法,但是要注意的是它仅仅是一个普通的 JavaScript
     * 对象,你可以用任何你想要的方式创建(ES6 class, 常见的构造函数, 对象字面量等等)。 特别地,不要尝试使用任何 RxJS 的实现细节去创建
     * Observers-你不需要这样做。 同样要记住,你的对象不需要实现所有的方法。如果你发现自己创建一个不做任何事情的方法,你可以简化它。
     * 不过要注意,如果 `error` 方法没用被提供,所有的错误都不会被捕获。
     *
     * 第二种方式是放弃 Observer 对象,只需提供回调函数来替代它的方法。这意味你可以给 `subscribe` 提供3个方法作为参数, 第一个回调
     * 等价于 `next` 方法,第二个等价于 `error` 方法,第三个等价于 `complete` 方法。就如同 Observer 一样,如果你不需要监听某中某个,你可以
     * 省略该函数,通过传递 `undefined` 或者 `null`,因为 `subscribe` 可以通过在函数调用中的位置识别了这些函数。提到 `error` 函数,正如上文所述,
     * 如果没有提供,Observable 发出的错误会被抛弃。
     *
     * 不管你使用了哪种方式调用 `subscribe`,所有情况都返回 Subscription 对象。该对象允许你调用 `unsubscribe`,该方法会停止 Observable
     * 的工作并且清理 Observable 持有的资源。注意,取消订阅不会调用 `subscribe` 提供的 `complete` 回调函数,`complete` 回调函数是为来自 Observable 的常规完成信号保留的。
     *
     * 记住,提供给 `subscribe` 的回调函数无法保证是被异步地调用。是 Observable 自身决定这些方法的执行。例如:{@link of}
     * 默认同步地发出所有的值。经常查看文档以确认给定的 Observable 被订阅时的行为是怎样的,以及它的默认行为是否可以通过使用 {@link Scheduler} 进行更改。
     *
     * @example <caption>用 Observer 对象订阅</caption>
     * const sumObserver = {
     *   sum: 0,
     *   next(value) {
     *     console.log('Adding: ' + value);
     *     this.sum = this.sum + value;
     *   },
     *   error() { // We actually could just remote this method,
     *   },        // since we do not really care about errors right now.
     *   complete() {
     *     console.log('Sum equals: ' + this.sum);
     *   }
     * };
     *
     * Rx.Observable.of(1, 2, 3) // 同步发出 1, 2, 3,然后完成。
     * .subscribe(sumObserver);
     *
     * // 日志:
     * // "Adding: 1"
     * // "Adding: 2"
     * // "Adding: 3"
     * // "Sum equals: 6"
     *
     *
     * @example <caption>用函数订阅</caption>
     * let sum = 0;
     *
     * Rx.Observable.of(1, 2, 3)
     * .subscribe(
     *   function(value) {
     *     console.log('Adding: ' + value);
     *     sum = sum + value;
     *   },
     *   undefined,
     *   function() {
     *     console.log('Sum equals: ' + sum);
     *   }
     * );
     *
     * // 日志:
     * // "Adding: 1"
     * // "Adding: 2"
     * // "Adding: 3"
     * // "Sum equals: 6"
     *
     *
     * @example <caption>取消订阅</caption>
     * const subscription = Rx.Observable.interval(1000).subscribe(
     *   num => console.log(num),
     *   undefined,
     *   () => console.log('completed!') // 即使当取消订阅时,也不会被调用
     * );
     *
     *
     * setTimeout(() => {
     *   subscription.unsubscribe();
     *   console.log('unsubscribed!');
     * }, 2500);
     *
     * // Logs:
     * // 0 after 1s
     * // 1 after 2s
     * // "unsubscribed!" after 2,5s
     *
     *
     * @param {Observer|Function} observerOrNext [可选] 或者是 observer 对象, 或者是1个到3个处理器,处理已订阅的 Observable
     * 发出的值。
     * @param {Function} error [可选] 由错误导致的终结事件的处理器。如果没有提供处理器,错误将不做处理直接抛弃。
     * @param {Function} complete [可选] 由成功完成导致的终结事件的处理器。
     * @return {ISubscription} 注册处理程序的订阅引用。
     * @method subscribe
     */
    subscribe(observerOrNext, error, complete) {
        const { operator } = this;
        const sink = toSubscriber(observerOrNext, error, complete);
        if (operator) {
            operator.call(sink, this.source);
        }
        else {
            sink.add(this.source ? this._subscribe(sink) : this._trySubscribe(sink));
        }
        if (sink.syncErrorThrowable) {
            sink.syncErrorThrowable = false;
            if (sink.syncErrorThrown) {
                throw sink.syncErrorValue;
            }
        }
        return sink;
    }
    _trySubscribe(sink) {
        try {
            return this._subscribe(sink);
        }
        catch (err) {
            sink.syncErrorThrown = true;
            sink.syncErrorValue = err;
            sink.error(err);
        }
    }
    /**
     * @method forEach
     * @param {Function} next  observable 发出的每个值的处理器。
     * @param {PromiseConstructor} [PromiseCtor] 用来生成 Promise 的构造函数。
     * @return {Promise} 一个 observable 完成则 resolves,错误则 rejects 的 promise。
     */
    forEach(next, PromiseCtor) {
        if (!PromiseCtor) {
            if (root.Rx && root.Rx.config && root.Rx.config.Promise) {
                PromiseCtor = root.Rx.config.Promise;
            }
            else if (root.Promise) {
                PromiseCtor = root.Promise;
            }
        }
        if (!PromiseCtor) {
            throw new Error('no Promise impl found');
        }
        return new PromiseCtor((resolve, reject) => {
            // Must be declared in a separate statement to avoid a RefernceError when
            // accessing subscription below in the closure due to Temporal Dead Zone.
            let subscription;
            subscription = this.subscribe((value) => {
                if (subscription) {
                    // if there is a subscription, then we can surmise
                    // the next handling is asynchronous. Any errors thrown
                    // need to be rejected explicitly and unsubscribe must be
                    // called manually
                    try {
                        next(value);
                    }
                    catch (err) {
                        reject(err);
                        subscription.unsubscribe();
                    }
                }
                else {
                    // if there is NO subscription, then we're getting a nexted
                    // value synchronously during subscription. We can just call it.
                    // If it errors, Observable's `subscribe` will ensure the
                    // unsubscription logic is called, then synchronously rethrow the error.
                    // After that, Promise will trap the error and send it
                    // down the rejection path.
                    next(value);
                }
            }, reject, resolve);
        });
    }
    _subscribe(subscriber) {
        return this.source.subscribe(subscriber);
    }
    /**
     * An interop point defined by the es7-observable spec https://github.com/zenparsing/es-observable
     * @method Symbol.observable
     * @return {Observable} this instance of the observable
     */
    [Symbol_observable]() {
        return this;
    }
}
// HACK: Since TypeScript inherits static properties too, we have to
// fight against TypeScript here so Subject can have a different static create signature
/**
 * 通过调用 Observable 的构造函数,创建一个新的冷 Observable。
 * @static true
 * @owner Observable
 * @method create
 * @param {Function} subscribe? subscriber 函数会传递给 Observable 的构造函数。
 * @return {Observable} 新的冷 observable
 */
Observable.create = (subscribe) => {
    return new Observable(subscribe);
};
//# sourceMappingURL=Observable.js.map