Home Manual Reference Source Test Repository

es6/operator/reduce.js

import { Subscriber } from '../Subscriber';
/* tslint:enable:max-line-length */
/**
 * 在源 Observalbe 上应用 accumulator (累加器) 函数,然后当源 Observable 完成时,返回
 * 累加的结果,可以提供一个可选的 seed 值。
 *
 * <span class="informal">使用 accumulator (累加器) 函数将源 Observable 所发出的所有值归并在一起,
 * 该函数知道如何将新的源值纳入到过往的累加结果中。</span>
 *
 * <img src="./img/reduce.png" width="100%">
 *
 * 类似于 [Array.prototype.reduce()](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array/reduce),
 * `reduce` 可以对累加值和源 Observable (过去的)的每个值应用 `accumulator` 函数,
 * 然后将其归并成一个值并且在输出 Observable 上发出。注意,`reduce` 只会发出一个值,
 * 并且是当源 Observable 完成时才发出。它等价于使用 {@link scan} 操作符后面再跟
 * {@link last} 操作符。
 *
 * 返回的 Observable 为由源 Observable 发出的每项应用指定的 `accumulator` 函数。
 * 如果指定了 `seed` 值,那么这个值会作为 `accumulator` 函数的初始值。如果没有指定
 * `seed` 值,那么源中的第一项会作为 `seed` 来使用。
 *
 * @example <caption>计算5秒内发生的点击次数</caption>
 * var clicksInFiveSeconds = Rx.Observable.fromEvent(document, 'click')
 *   .takeUntil(Rx.Observable.interval(5000));
 * var ones = clicksInFiveSeconds.mapTo(1);
 * var seed = 0;
 * var count = ones.reduce((acc, one) => acc + one, seed);
 * count.subscribe(x => console.log(x));
 *
 * @see {@link count}
 * @see {@link expand}
 * @see {@link mergeScan}
 * @see {@link scan}
 *
 * @param {function(acc: R, value: T, index: number): R} accumulator 调用每个
 * 源值的累加器函数。
 * @param {R} [seed] 初始累加值。
 * @return {Observable<R>} 该 Observable 发出单个值,这个值是由源 Observable
 * 发出值累加的结果。
 * @method reduce
 * @owner Observable
 */
export function reduce(accumulator, seed) {
    let hasSeed = false;
    // providing a seed of `undefined` *should* be valid and trigger
    // hasSeed! so don't use `seed !== undefined` checks!
    // For this reason, we have to check it here at the original call site
    // otherwise inside Operator/Subscriber we won't know if `undefined`
    // means they didn't provide anything or if they literally provided `undefined`
    if (arguments.length >= 2) {
        hasSeed = true;
    }
    return this.lift(new ReduceOperator(accumulator, seed, hasSeed));
}
export class ReduceOperator {
    constructor(accumulator, seed, hasSeed = false) {
        this.accumulator = accumulator;
        this.seed = seed;
        this.hasSeed = hasSeed;
    }
    call(subscriber, source) {
        return source.subscribe(new ReduceSubscriber(subscriber, this.accumulator, this.seed, this.hasSeed));
    }
}
/**
 * We need this JSDoc comment for affecting ESDoc.
 * @ignore
 * @extends {Ignored}
 */
export class ReduceSubscriber extends Subscriber {
    constructor(destination, accumulator, seed, hasSeed) {
        super(destination);
        this.accumulator = accumulator;
        this.hasSeed = hasSeed;
        this.index = 0;
        this.hasValue = false;
        this.acc = seed;
        if (!this.hasSeed) {
            this.index++;
        }
    }
    _next(value) {
        if (this.hasValue || (this.hasValue = this.hasSeed)) {
            this._tryReduce(value);
        }
        else {
            this.acc = value;
            this.hasValue = true;
        }
    }
    _tryReduce(value) {
        let result;
        try {
            result = this.accumulator(this.acc, value, this.index++);
        }
        catch (err) {
            this.destination.error(err);
            return;
        }
        this.acc = result;
    }
    _complete() {
        if (this.hasValue || this.hasSeed) {
            this.destination.next(this.acc);
        }
        this.destination.complete();
    }
}
//# sourceMappingURL=reduce.js.map