Home Manual Reference Source Test Repository

es6/operator/audit.js

import { tryCatch } from '../util/tryCatch';
import { errorObject } from '../util/errorObject';
import { OuterSubscriber } from '../OuterSubscriber';
import { subscribeToResult } from '../util/subscribeToResult';
/**
 * 在另一个 Observable 决定的时间段里忽略源数据,然后发出源 Observable 中最新发出的值,
 * 然后重复此过程。
 *
 * <span class="informal">就像是{@link auditTime}, 但是沉默持续时间段由第二个 Observable 决定。</span>
 *
 * <img src="./img/audit.png" width="100%">
 *
 * `audit` 和 `throttle` 很像, 但是发出沉默时间窗口的最后一个值, 而不是第一个。只要 audit 的内部时间器被禁用,
 * 它就会在输出 Observable 上发出源 Observable 的最新值,并且当时间器启用时忽略源值。初始时,时间器是禁用的。
 * 只要第一个源值到达,时间器是用源值调用 durationselector 方法启用,返回 "duration" Observable。 当 duration Observable
 * 发出数据或者完成时,时间器禁用,然后输出 Observable 发出最新的源值,并且不断的重复这个过程。
 *
 * @example <caption>以每秒最多点击一次的频率发出点击事件</caption>
 * var clicks = Rx.Observable.fromEvent(document, 'click');
 * var result = clicks.audit(ev => Rx.Observable.interval(1000));
 * result.subscribe(x => console.log(x));
 *
 * @see {@link auditTime}
 * @see {@link debounce}
 * @see {@link delayWhen}
 * @see {@link sample}
 * @see {@link throttle}
 *
 * @param {function(value: T): SubscribableOrPromise} durationSelector 该函数从源 Observable 中接收值,用于为每个源值计算沉默持续时间,并返回 Observable 或 Promise 。
 * @return {Observable<T>} 该 Observable 限制源 Observable 的发送频率。
 * @method audit
 * @owner Observable
 */
export function audit(durationSelector) {
    return this.lift(new AuditOperator(durationSelector));
}
class AuditOperator {
    constructor(durationSelector) {
        this.durationSelector = durationSelector;
    }
    call(subscriber, source) {
        return source.subscribe(new AuditSubscriber(subscriber, this.durationSelector));
    }
}
/**
 * We need this JSDoc comment for affecting ESDoc.
 * @ignore
 * @extends {Ignored}
 */
class AuditSubscriber extends OuterSubscriber {
    constructor(destination, durationSelector) {
        super(destination);
        this.durationSelector = durationSelector;
        this.hasValue = false;
    }
    _next(value) {
        this.value = value;
        this.hasValue = true;
        if (!this.throttled) {
            const duration = tryCatch(this.durationSelector)(value);
            if (duration === errorObject) {
                this.destination.error(errorObject.e);
            }
            else {
                const innerSubscription = subscribeToResult(this, duration);
                if (innerSubscription.closed) {
                    this.clearThrottle();
                }
                else {
                    this.add(this.throttled = innerSubscription);
                }
            }
        }
    }
    clearThrottle() {
        const { value, hasValue, throttled } = this;
        if (throttled) {
            this.remove(throttled);
            this.throttled = null;
            throttled.unsubscribe();
        }
        if (hasValue) {
            this.value = null;
            this.hasValue = false;
            this.destination.next(value);
        }
    }
    notifyNext(outerValue, innerValue, outerIndex, innerIndex) {
        this.clearThrottle();
    }
    notifyComplete() {
        this.clearThrottle();
    }
}
//# sourceMappingURL=audit.js.map