Home Manual Reference Source Test Repository

es6/operator/merge.js

import { Observable } from '../Observable';
import { ArrayObservable } from '../observable/ArrayObservable';
import { MergeAllOperator } from './mergeAll';
import { isScheduler } from '../util/isScheduler';
/* tslint:enable:max-line-length */
/**
 * 创建一个输出 Observable ,它可以同时发出每个给定的输入 Observable 中的所有值。
 *
 * <span class="informal">通过把多个 Observables 的值混合到一个 Observable 中
 * 来将其打平。</span>
 *
 * <img src="./img/merge.png" width="100%">
 *
 * `merge` 订阅每个给定的输入 Observable (给定的源或作为参数的 Observable ),然后只是
 * 将所有输入 Observables 的所有值发送(不进行任何转换)到输出 Observable 。所有的输入
 * Observable 都完成了,输出 Observable 才能完成。任何由输入 Observable 发出的错误都
 * 会立即在输出 Observalbe 上发出。
 *
 * @example <caption>合并两个 Observables: 时间间隔为1秒的 timer 和 clicks</caption>
 * var clicks = Rx.Observable.fromEvent(document, 'click');
 * var timer = Rx.Observable.interval(1000);
 * var clicksOrTimer = clicks.merge(timer);
 * clicksOrTimer.subscribe(x => console.log(x));
 *
 * @example <caption>合并三个 Observables ,但只能同时运行两个</caption>
 * var timer1 = Rx.Observable.interval(1000).take(10);
 * var timer2 = Rx.Observable.interval(2000).take(6);
 * var timer3 = Rx.Observable.interval(500).take(10);
 * var concurrent = 2; // 参数
 * var merged = timer1.merge(timer2, timer3, concurrent);
 * merged.subscribe(x => console.log(x));
 *
 * @see {@link mergeAll}
 * @see {@link mergeMap}
 * @see {@link mergeMapTo}
 * @see {@link mergeScan}
 *
 * @param {ObservableInput} other 可以与源 Observable 合并的输入 Observable 。
 * 可以给定多个输入 Observables 作为参数。
 * @param {number} [concurrent=Number.POSITIVE_INFINITY] 可以同时订阅的输入
 * Observables 的最大数量。
 * @param {Scheduler} [scheduler=null] 用来管理输入 Observables 的并发性的
 * 调度器。
 * @return {Observable} 该 Observable 发出的项是每个输入 Observable 的结果。
 * @method merge
 * @owner Observable
 */
export function merge(...observables) {
    return this.lift.call(mergeStatic(this, ...observables));
}
/* tslint:enable:max-line-length */
/**
 * 创建一个输出 Observable ,它可以同时发出每个给定的输入 Observable 中值。
 *
 * <span class="informal">通过把多个 Observables 的值混合到一个 Observable 中来将其打平。</span>
 *
 * <img src="./img/merge.png" width="100%">
 *
 * `merge` 订阅每个给定的输入 Observable (作为参数),然后只是将所有输入 Observables 的所有值发
 * 送(不进行任何转换)到输出 Observable 。所有的输入 Observable 都完成了,输出 Observable 才
 * 能完成。任何由输入 Observable 发出的错误都会立即在输出 Observalbe 上发出。
 *
 * @example <caption>合并两个 Observables: 时间间隔为1秒的 timer 和 clicks</caption>
 * var clicks = Rx.Observable.fromEvent(document, 'click');
 * var timer = Rx.Observable.interval(1000);
 * var clicksOrTimer = Rx.Observable.merge(clicks, timer);
 * clicksOrTimer.subscribe(x => console.log(x));
 *
 * // 结果如下:
 * // 每隔1s发出一个自增值到控制台
 * // document被点击的时候MouseEvents会被打印到控制台
 * // 因为两个流被合并了,所以你当它们发生的时候你就可以看见.
 *
 * @example <caption>合并3个Observables, 但是只并行运行2个</caption>
 * var timer1 = Rx.Observable.interval(1000).take(10);
 * var timer2 = Rx.Observable.interval(2000).take(6);
 * var timer3 = Rx.Observable.interval(500).take(10);
 * var concurrent = 2; // the argument
 * var merged = Rx.Observable.merge(timer1, timer2, timer3, concurrent);
 * merged.subscribe(x => console.log(x));
 *
 * // 结果如下:
 * // - timer1和timer2将会并行运算
 * // - timer1每隔1s发出值,迭代10次
 * // - timer2每隔1s发出值,迭代6次
 * // - timer1达到迭代最大次数,timer2会继续,timer3开始和timer2并行运行
 * // - 当timer2达到最大迭代次数就停止,timer3将会继续每隔500ms发出数据直到结束
 *
 * @see {@link mergeAll}
 * @see {@link mergeMap}
 * @see {@link mergeMapTo}
 * @see {@link mergeScan}
 *
 * @param {...ObservableInput} observables 合并到一起的输入Observables。
 * @param {number} [concurrent=Number.POSITIVE_INFINITY] 可以同时订阅的输入 Observables 的最大数量。
 * @param {Scheduler} [scheduler=null] 调度器用来管理并行的输入Observables。
 * @return {Observable} 该 Observable 发出的项是每个输入 Observable 的结果。
 * @static true
 * @name merge
 * @owner Observable
 */
export function mergeStatic(...observables) {
    let concurrent = Number.POSITIVE_INFINITY;
    let scheduler = null;
    let last = observables[observables.length - 1];
    if (isScheduler(last)) {
        scheduler = observables.pop();
        if (observables.length > 1 && typeof observables[observables.length - 1] === 'number') {
            concurrent = observables.pop();
        }
    }
    else if (typeof last === 'number') {
        concurrent = observables.pop();
    }
    if (scheduler === null && observables.length === 1 && observables[0] instanceof Observable) {
        return observables[0];
    }
    return new ArrayObservable(observables, scheduler).lift(new MergeAllOperator(concurrent));
}
//# sourceMappingURL=merge.js.map