es6/observable/BoundCallbackObservable.js
import { Observable } from '../Observable';
import { tryCatch } from '../util/tryCatch';
import { errorObject } from '../util/errorObject';
import { AsyncSubject } from '../AsyncSubject';
/**
* We need this JSDoc comment for affecting ESDoc.
* @extends {Ignored}
* @hide true
*/
export class BoundCallbackObservable extends Observable {
constructor(callbackFunc, selector, args, context, scheduler) {
super();
this.callbackFunc = callbackFunc;
this.selector = selector;
this.args = args;
this.context = context;
this.scheduler = scheduler;
}
/* tslint:enable:max-line-length */
/**
* 把回调API转化为返回Observable的函数。
*
* <span class="informal">给它一个签名为`f(x, callback)`的函数f,返回一个函数g,
* 调用'g(x)'的时候会返回一个 Observable。</span>
*
* `bindCallback` 并不是一个操作符,因为它的输入和输出并不是 Observable 。输入的是一个
* 带有多个参数的函数,并且该函数的最后一个参数必须是个回调函数,当该函数执行完之后会调用回调函数。
*
* `bindCallback` 的输出是一个函数,该函数接受的参数和输入函数一样(除了没有最后一个回调函
* 数)。当输出函数被调用,会返回一个 Observable 。如果输入函数给回调函数传递一个值,则该 Observable
* 会发出这个值。如果输入函数给回调函数传递多个值,则该 Observable 会发出一个包含所有值的数组。
*
* 很重要的一点是,输出函数返回的 Observable 被订阅之前,输入函数是不会执行的。这意味着如果输入
* 函数发起 AJAX 请求,那么该请求在每次订阅返回的 Observable 之后才会发出,而不是之前。
*
* 作为一个可选项,selector 函数可以传给`bindObservable`。该函数接受和回调一样的参数。返回 Observable
* 发出的值,而不是回调参数本身,即使在默认情况下,传递给回调的多个参数将在流中显示为数组。选择器
* 函数直接用参数调用,就像回调一样。这意味着你可以想象默认选择器(当没有显示提供的时候)是这样
* 一个函数:将它的所有参数聚集到数组中,或者仅仅返回第一个参数(当只有一个参数的时候)。
*
* 最后一个可选参数 - {@link Scheduler} - 当 Observable 被订阅的时候,可以用来控制调用输入函
* 数以及发出结果的时机。默认订阅 Observable 后调用输入函数是同步的,但是使用`Scheduler.async`
* 作为最后一个参数将会延迟输入函数的调用,就像是用0毫秒的setTimeout包装过。所以如果你使用了异
* 步调度器并且订阅了 Observable ,当前正在执行的所有函数调用,将在调用“输入函数”之前结束。
*
* 当涉及到传递给回调的结果时,默认情况下当输入函数调用回调之后会立马发出,特别是如果回调也是同步调动的话,
* 那么 Observable 的订阅也会同步调用`next`方法。如果你想延迟调用,使用`Scheduler.async`。
* 这意味着通过使用`Scheduler.async`,你可以确保输入函数永远异步调用回调函数,从而避免了可怕的Zalgo。
*
* 需要注意的是,输出函数返回的Observable只能发出一次然后完成。即使输入函数多次调用回调函数,第二次
* 以及之后的调用都不会出现在流中。如果你需要监听多次的调用,你大概需要使用{@link fromEvent}或者
* {@link fromEventPattern}来代替。
*
* 如果输入函数依赖上下文(this),该上下文将被设置为输出函数在调用时的同一上下文。特别是如果输入函数
* 被当作是某个对象的方法进行调用,为了保持同样的行为,建议将输出函数的上下文设置为该对象,输入方法不
* 是已经绑定好的。
*
* 如果输入函数以 node 的方式(第一个参数是可选的错误参数用来标示调用是否成功)调用回调函数,{@link bindNodeCallback}
* 提供了方便的错误处理,也许是更好的选择。 `bindCallback` 不会区别对待这些方法,错误参数(是否传递)
* 被解释成正常的参数。
*
* @example <caption>把jQuery的getJSON方法转化为Observable API</caption>
* // 假设我们有这个方法:jQuery.getJSON('/my/url', callback)
* var getJSONAsObservable = Rx.Observable.bindCallback(jQuery.getJSON);
* var result = getJSONAsObservable('/my/url');
* result.subscribe(x => console.log(x), e => console.error(e));
*
*
* @example <caption>接收传递给回调的数组参数。</caption>
* someFunction((a, b, c) => {
* console.log(a); // 5
* console.log(b); // 'some string'
* console.log(c); // {someProperty: 'someValue'}
* });
*
* const boundSomeFunction = Rx.Observable.bindCallback(someFunction);
* boundSomeFunction().subscribe(values => {
* console.log(values) // [5, 'some string', {someProperty: 'someValue'}]
* });
*
*
* @example <caption>使用带 selector 函数的 bindCallback。</caption>
* someFunction((a, b, c) => {
* console.log(a); // 'a'
* console.log(b); // 'b'
* console.log(c); // 'c'
* });
*
* const boundSomeFunction = Rx.Observable.bindCallback(someFunction, (a, b, c) => a + b + c);
* boundSomeFunction().subscribe(value => {
* console.log(value) // 'abc'
* });
*
*
* @example <caption>对使用和不使用 async 调度器的行为进行比较。</caption>
* function iCallMyCallbackSynchronously(cb) {
* cb();
* }
*
* const boundSyncFn = Rx.Observable.bindCallback(iCallMyCallbackSynchronously);
* const boundAsyncFn = Rx.Observable.bindCallback(iCallMyCallbackSynchronously, null, Rx.Scheduler.async);
*
* boundSyncFn().subscribe(() => console.log('I was sync!'));
* boundAsyncFn().subscribe(() => console.log('I was async!'));
* console.log('This happened...');
*
* // Logs:
* // I was sync!
* // This happened...
* // I was async!
*
*
* @example <caption>在对象方法上使用 bindCallback</caption>
* const boundMethod = Rx.Observable.bindCallback(someObject.methodWithCallback);
* boundMethod.call(someObject) // 确保methodWithCallback可以访问someObject
* .subscribe(subscriber);
*
*
* @see {@link bindNodeCallback}
* @see {@link from}
* @see {@link fromPromise}
*
* @param {function} func 最后一个参数是回调的函数。
* @param {function} [selector] 选择器,从回调函数中获取参数并将这些映射为一个 Observable 发出的值。
* @param {Scheduler} [scheduler] 调度器,调度回调函数。
* @return {function(...params: *): Observable} 一个返回Observable的函数,该Observable发出回调函数返回的数据。
* @static true
* @name bindCallback
* @owner Observable
*/
static create(func, selector = undefined, scheduler) {
return function (...args) {
return new BoundCallbackObservable(func, selector, args, this, scheduler);
};
}
_subscribe(subscriber) {
const callbackFunc = this.callbackFunc;
const args = this.args;
const scheduler = this.scheduler;
let subject = this.subject;
if (!scheduler) {
if (!subject) {
subject = this.subject = new AsyncSubject();
const handler = function handlerFn(...innerArgs) {
const source = handlerFn.source;
const { selector, subject } = source;
if (selector) {
const result = tryCatch(selector).apply(this, innerArgs);
if (result === errorObject) {
subject.error(errorObject.e);
}
else {
subject.next(result);
subject.complete();
}
}
else {
subject.next(innerArgs.length <= 1 ? innerArgs[0] : innerArgs);
subject.complete();
}
};
// use named function instance to avoid closure.
handler.source = this;
const result = tryCatch(callbackFunc).apply(this.context, args.concat(handler));
if (result === errorObject) {
subject.error(errorObject.e);
}
}
return subject.subscribe(subscriber);
}
else {
return scheduler.schedule(BoundCallbackObservable.dispatch, 0, { source: this, subscriber, context: this.context });
}
}
static dispatch(state) {
const self = this;
const { source, subscriber, context } = state;
const { callbackFunc, args, scheduler } = source;
let subject = source.subject;
if (!subject) {
subject = source.subject = new AsyncSubject();
const handler = function handlerFn(...innerArgs) {
const source = handlerFn.source;
const { selector, subject } = source;
if (selector) {
const result = tryCatch(selector).apply(this, innerArgs);
if (result === errorObject) {
self.add(scheduler.schedule(dispatchError, 0, { err: errorObject.e, subject }));
}
else {
self.add(scheduler.schedule(dispatchNext, 0, { value: result, subject }));
}
}
else {
const value = innerArgs.length <= 1 ? innerArgs[0] : innerArgs;
self.add(scheduler.schedule(dispatchNext, 0, { value, subject }));
}
};
// use named function to pass values in without closure
handler.source = source;
const result = tryCatch(callbackFunc).apply(context, args.concat(handler));
if (result === errorObject) {
subject.error(errorObject.e);
}
}
self.add(subject.subscribe(subscriber));
}
}
function dispatchNext(arg) {
const { value, subject } = arg;
subject.next(value);
subject.complete();
}
function dispatchError(arg) {
const { err, subject } = arg;
subject.error(err);
}
//# sourceMappingURL=BoundCallbackObservable.js.map