es6/operator/repeatWhen.js
import { Subject } from '../Subject';
import { tryCatch } from '../util/tryCatch';
import { errorObject } from '../util/errorObject';
import { OuterSubscriber } from '../OuterSubscriber';
import { subscribeToResult } from '../util/subscribeToResult';
/**
* 返回的 Observalbe 是源 Observable 的镜像,除了 `complete` 。如果源 Observable 调用了 `complete`,这个方法会发出给 `notifier`
* 返回的 Observable 。如果这个 Observale 调用了 `complete` 或 `error`,那么这个方法会在子 subscription 上调用
* `complete` 或 `error` 。否则,此方法将重新订阅源 Observable。
*
* <img src="./img/repeatWhen.png" width="100%">
*
* @param {function(notifications: Observable): Observable} notifier - 接收 Observable 的通知,用户可以该通知
* 的 `complete` 或`error` 来中止重复。
* @return {Observable} 使用重复逻辑修改过的源 Observable 。
* @method repeatWhen
* @owner Observable
*/
export function repeatWhen(notifier) {
return this.lift(new RepeatWhenOperator(notifier));
}
class RepeatWhenOperator {
constructor(notifier) {
this.notifier = notifier;
}
call(subscriber, source) {
return source.subscribe(new RepeatWhenSubscriber(subscriber, this.notifier, source));
}
}
/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class RepeatWhenSubscriber extends OuterSubscriber {
constructor(destination, notifier, source) {
super(destination);
this.notifier = notifier;
this.source = source;
this.sourceIsBeingSubscribedTo = true;
}
notifyNext(outerValue, innerValue, outerIndex, innerIndex, innerSub) {
this.sourceIsBeingSubscribedTo = true;
this.source.subscribe(this);
}
notifyComplete(innerSub) {
if (this.sourceIsBeingSubscribedTo === false) {
return super.complete();
}
}
complete() {
this.sourceIsBeingSubscribedTo = false;
if (!this.isStopped) {
if (!this.retries) {
this.subscribeToRetries();
}
else if (this.retriesSubscription.closed) {
return super.complete();
}
this._unsubscribeAndRecycle();
this.notifications.next();
}
}
_unsubscribe() {
const { notifications, retriesSubscription } = this;
if (notifications) {
notifications.unsubscribe();
this.notifications = null;
}
if (retriesSubscription) {
retriesSubscription.unsubscribe();
this.retriesSubscription = null;
}
this.retries = null;
}
_unsubscribeAndRecycle() {
const { notifications, retries, retriesSubscription } = this;
this.notifications = null;
this.retries = null;
this.retriesSubscription = null;
super._unsubscribeAndRecycle();
this.notifications = notifications;
this.retries = retries;
this.retriesSubscription = retriesSubscription;
return this;
}
subscribeToRetries() {
this.notifications = new Subject();
const retries = tryCatch(this.notifier)(this.notifications);
if (retries === errorObject) {
return super.complete();
}
this.retries = retries;
this.retriesSubscription = subscribeToResult(this, retries);
}
}
//# sourceMappingURL=repeatWhen.js.map