es6/observable/BoundNodeCallbackObservable.js
import { Observable } from '../Observable';
import { tryCatch } from '../util/tryCatch';
import { errorObject } from '../util/errorObject';
import { AsyncSubject } from '../AsyncSubject';
/**
* We need this JSDoc comment for affecting ESDoc.
* @extends {Ignored}
* @hide true
*/
export class BoundNodeCallbackObservable extends Observable {
constructor(callbackFunc, selector, args, context, scheduler) {
super();
this.callbackFunc = callbackFunc;
this.selector = selector;
this.args = args;
this.context = context;
this.scheduler = scheduler;
}
/* tslint:enable:max-line-length */
/**
* 把 Node.js 式回调API转换为返回 Observable 的函数。
*
* <span class="informal">就像是 {@link bindCallback}, 但是回调函数必须形如
* `callback(error, result)`这样</span>
*
* `bindNodeCallback` 并不是一个操作符,因为它的输入和输出并不是 Observable。输入的是一个
* 带有多个参数的函数,并且该函数的最后一个参数必须是个回调函数,当该函数执行完之后会掉
* 用回调函数,回调函数被要求遵循 Node.js 公约,其中第一个参数是错误对象,标示调用是否成功。
* 如果这个错误对象被传递给了回调函数,这意味着调用出现了错误。
*
* `bindNodeCallback` 的输出是一个函数,该函数接受的参数和输入函数一样(除了没有最后一个回调函
* 数)。当输出函数被调用,会返回一个 Observable 。如果输入函数带着错误对象调用回调函数,Observable
* 也会用这个错误对象触发错误状态。如果错误对象没有被传递,Observable 会发出第二个参数。
* 如果输入函数给回调函数传递三个或者更多的值,该 Observable 会发出一个包含除了第一个错误参
* 数的所有值的数组。
*
* `bindNodeCallback`接受可选的选择器函数,它允许 Observable 发出由选择器计算的值,而
* 不是普通的回调参数。这和{@link bindCallback}的选择器效果类似,但是 node 式的错误参数永远
* 不会传递给该函数。
*
* 注意,输入函数永远不会被调用直到输出函数返回的 Observable 被订阅。默认情况下,订阅后会同步调用
* 输入方法, 但是这可以被改变,通过使用{@link Scheduler}作为可选的第三个参数。
* 调度器可以控制 Observable 何时发出数据。想要获取更多信息,请查看{@link bindCallback}的文档,
* 工作原理完全一样。
*
* 和{@link bindCallback}一样,输入函数的上下文(this)将会被设置给输出函数的上下文,当它被调用
* 的时候。当 Observable 发出了数据后,它会立马完成。这意味着即使输入函数再次调用回调函数,第二次以
* 及后续调用的值永远不会出现在流中。如果你需要处理多次调用,查看{@link fromEvent}或者{@link fromEventPattern}
* 来替代。
*
* 注意,`bindNodeCallback`同样可以用在非 Node.js 环境中,Node.js 式回调函数仅仅是一种公约,所以
* 如果你的目标环境是浏览器或者其他,并且你使用的API遵守了这种回调公约,`bindNodeCallback`就可以
* 安全的使用那些API函数。
*
* 牢记,传递给回调的错误对象并不是 JavaScript 内置的 Error 的实例。事实上,它甚至可以不是对象。
* 回调函数的错误参数被解读为“存在”,当该参数有值的时候。它可以是,例如,非0数字,非空字符串,逻辑
* 是。在所有这些情况下,都会触发 Observable 的错误状态。这意味着当使用`bindNodeCallback`
* 的时候通常形式的回调函数都会触发失败。如果你的 Observable 经常发生你预料之外的错误,请检查下
* 回调函数是否是 node.js 式的回调,如果不是,请使用{@link bindCallback}替代。
*
* 注意,即使错误参数出现在回调函数中,但是它的值是假值,它仍然不会出现在Observable的发出数组或者选择器中。
*
* @example <caption>从文件系统中读取文件并且从 Observable 中获取数据。</caption>
* import * as fs from 'fs';
* var readFileAsObservable = Rx.Observable.bindNodeCallback(fs.readFile);
* var result = readFileAsObservable('./roadNames.txt', 'utf8');
* result.subscribe(x => console.log(x), e => console.error(e));
*
*
* @example <caption>使用具有多个参数的函数调用回调</caption>
* someFunction((err, a, b) => {
* console.log(err); // null
* console.log(a); // 5
* console.log(b); // "some string"
* });
* var boundSomeFunction = Rx.Observable.bindNodeCallback(someFunction);
* boundSomeFunction()
* .subscribe(value => {
* console.log(value); // [5, "some string"]
* });
*
*
* @example <caption>使用选择器函数</caption>
* someFunction((err, a, b) => {
* console.log(err); // undefined
* console.log(a); // "abc"
* console.log(b); // "DEF"
* });
* var boundSomeFunction = Rx.Observable.bindNodeCallback(someFunction, (a, b) => a + b);
* boundSomeFunction()
* .subscribe(value => {
* console.log(value); // "abcDEF"
* });
*
*
* @example <caption>非 node.js 式的回调函数</caption>
* someFunction(a => {
* console.log(a); // 5
* });
* var boundSomeFunction = Rx.Observable.bindNodeCallback(someFunction);
* boundSomeFunction()
* .subscribe(
* value => {} // never gets called
* err => console.log(err) // 5
*);
*
*
* @see {@link bindCallback}
* @see {@link from}
* @see {@link fromPromise}
*
* @param {function} func 最后一个参数是 node.js 式回调的函数。
* @param {function} [selector] 选择器,从回调函数中获取参数并将这些映射为一个 Observable 发出的值。
* @param {Scheduler} [scheduler] 调度器,调度回调函数。
* @return {function(...params: *): 一个返回 Observable 的函数,该 Observable 发出 node.js 式回调函数返回的数据。
* @static true
* @name bindNodeCallback
* @owner Observable
*/
static create(func, selector = undefined, scheduler) {
return function (...args) {
return new BoundNodeCallbackObservable(func, selector, args, this, scheduler);
};
}
_subscribe(subscriber) {
const callbackFunc = this.callbackFunc;
const args = this.args;
const scheduler = this.scheduler;
let subject = this.subject;
if (!scheduler) {
if (!subject) {
subject = this.subject = new AsyncSubject();
const handler = function handlerFn(...innerArgs) {
const source = handlerFn.source;
const { selector, subject } = source;
const err = innerArgs.shift();
if (err) {
subject.error(err);
}
else if (selector) {
const result = tryCatch(selector).apply(this, innerArgs);
if (result === errorObject) {
subject.error(errorObject.e);
}
else {
subject.next(result);
subject.complete();
}
}
else {
subject.next(innerArgs.length <= 1 ? innerArgs[0] : innerArgs);
subject.complete();
}
};
// use named function instance to avoid closure.
handler.source = this;
const result = tryCatch(callbackFunc).apply(this.context, args.concat(handler));
if (result === errorObject) {
subject.error(errorObject.e);
}
}
return subject.subscribe(subscriber);
}
else {
return scheduler.schedule(dispatch, 0, { source: this, subscriber, context: this.context });
}
}
}
function dispatch(state) {
const self = this;
const { source, subscriber, context } = state;
// XXX: cast to `any` to access to the private field in `source`.
const { callbackFunc, args, scheduler } = source;
let subject = source.subject;
if (!subject) {
subject = source.subject = new AsyncSubject();
const handler = function handlerFn(...innerArgs) {
const source = handlerFn.source;
const { selector, subject } = source;
const err = innerArgs.shift();
if (err) {
self.add(scheduler.schedule(dispatchError, 0, { err, subject }));
}
else if (selector) {
const result = tryCatch(selector).apply(this, innerArgs);
if (result === errorObject) {
self.add(scheduler.schedule(dispatchError, 0, { err: errorObject.e, subject }));
}
else {
self.add(scheduler.schedule(dispatchNext, 0, { value: result, subject }));
}
}
else {
const value = innerArgs.length <= 1 ? innerArgs[0] : innerArgs;
self.add(scheduler.schedule(dispatchNext, 0, { value, subject }));
}
};
// use named function to pass values in without closure
handler.source = source;
const result = tryCatch(callbackFunc).apply(context, args.concat(handler));
if (result === errorObject) {
self.add(scheduler.schedule(dispatchError, 0, { err: errorObject.e, subject }));
}
}
self.add(subject.subscribe(subscriber));
}
function dispatchNext(arg) {
const { value, subject } = arg;
subject.next(value);
subject.complete();
}
function dispatchError(arg) {
const { err, subject } = arg;
subject.error(err);
}
//# sourceMappingURL=BoundNodeCallbackObservable.js.map