Observable
Direct Subclass:
Indirect Subclass:
表示在任意时间内的任意一组值。 这是 RxJS 最基本的构建块。
Static Method Summary
Static Public Methods | ||
public static |
bindCallback(func: function, selector: function, scheduler: Scheduler): function(...params: *): Observable 把回调API转化为返回Observable的函数。 |
|
public static |
bindNodeCallback(func: function, selector: function, scheduler: Scheduler): * 把 Node.js 式回调API转换为返回 Observable 的函数。 |
|
public static |
combineLatest(observable1: ObservableInput, observable2: ObservableInput, project: function, scheduler: Scheduler): Observable 组合多个 Observables 来创建一个 Observable ,该 Observable 的值根据每个输入 Observable 的最新值计算得出的。 |
|
public static |
concat(input1: ObservableInput, input2: ObservableInput, scheduler: Scheduler): Observable 创建一个输出 Observable,该 Observable 顺序的发出每个输入 Observable 的所有值。 |
|
public static |
create(onSubscription: function(observer: Observer): TeardownLogic): Observable 创建一个新的 Observable ,当观察者( Observer )订阅该 Observable 时,它会执行指定的函数。 |
|
public static |
defer(observableFactory: function(): SubscribableOrPromise): Observable 创建一个 Observable,当被订阅的时候,调用 Observable 工厂为每个观察者创建新的 Observable。 |
|
public static |
empty(scheduler: Scheduler): Observable 创建一个什么数据都不发出并且立马完成的 Observable。 |
|
public static |
forkJoin(sources: *): any |
|
public static |
from(ish: ObservableInput<T>, scheduler: Scheduler): Observable<T> 从一个数组、类数组对象、Promise、迭代器对象或者类 Observable 对象创建一个 Observable. |
|
public static |
fromEvent(target: EventTargetLike, eventName: string, options: EventListenerOptions, selector: SelectorMethodSignature<T>): Observable<T> 创建一个 Observable,该 Observable 发出来自给定事件对象的指定类型事件。 |
|
public static |
fromEventPattern(addHandler: function(handler: Function): any, removeHandler: function(handler: Function, signal?: any): void, selector: function(...args: any): T): Observable<T> 从一个基于 addHandler/removeHandler 方法的API创建 Observable。 |
|
public static |
fromPromise(promise: PromiseLike<T>, scheduler: Scheduler): Observable<T> 将 Promise 转化为 Observable。 |
|
public static |
interval(period: number, scheduler: Scheduler): Observable 创建一个 Observable ,该 Observable 使用指定的 IScheduler ,并以指定时间间隔发出连续的数字。 |
|
public static |
merge(observables: ...ObservableInput, concurrent: number, scheduler: Scheduler): Observable 创建一个输出 Observable ,它可以同时发出每个给定的输入 Observable 中值。 |
|
public static |
never(): Observable 创建一个不向观察者发出任何项的 Observable 。 |
|
public static |
of(values: ...T, scheduler: Scheduler): Observable<T> 创建一个 Observable,它会依次发出由你提供的参数,最后发出完成通知。 发出你提供的参数,然后完成。 |
|
public static |
range(start: number, count: number, scheduler: Scheduler): Observable 创建一个 Observable ,它发出指定范围内的数字序列。 |
|
public static |
throw(将具体的: any, scheduler: Scheduler): Observable 创建一个不发送数据给观察者并且立马发出错误通知的 Observable。 |
|
public static |
创建一个 Observable,该 Observable 在初始延时( |
|
public static |
webSocket(urlConfigOrSource: string | WebSocketSubjectConfig): WebSocketSubject 包装浏览器提供的兼容w3c的WebSocket对象. |
|
public static |
zip(observables: *): Observable<R> 将多个 Observable 组合以创建一个 Observable,该 Observable 的值是由所有输入 Observables 的值按顺序计算而来的。 |
Constructor Summary
Public Constructor | ||
public |
constructor(subscribe: Function) |
Method Summary
Public Methods | ||
public |
An interop point defined by the es7-observable spec https://github.com/zenparsing/es-observable |
|
public |
audit(durationSelector: function(value: T): SubscribableOrPromise): Observable<T> 在另一个 Observable 决定的时间段里忽略源数据,然后发出源 Observable 中最新发出的值, 然后重复此过程。 |
|
public |
auditTime(duration: number, scheduler: Scheduler): Observable<T> duration 毫秒内忽略源值,然后发出源 Observable 的最新值, 并且重复此过程。 |
|
public |
buffer(closingNotifier: Observable<any>): Observable<T[]> 缓冲源 Observable 的值直到 |
|
public |
bufferCount(bufferSize: number, startBufferEvery: number): Observable<T[]> 缓冲源 Observable 的值直到缓冲数量到达设定的 |
|
public |
bufferTime(bufferTimeSpan: number, bufferCreationInterval: number, maxBufferSize: number, scheduler: Scheduler): Observable<T[]> 在特定时间周期内缓冲源 Observable 的值。 |
|
public |
bufferToggle(openings: SubscribableOrPromise<O>, closingSelector: function(value: O): SubscribableOrPromise): Observable<T[]> 缓冲源 Observable 的值, |
|
public |
bufferWhen(closingSelector: function(): Observable): Observable<T[]> 缓冲源 Observable 的值, 使用关闭 Observable 的工厂函数来决定何时关闭、发出和重置缓冲区。 |
|
public |
catch(selector: function): Observable 捕获 observable 中的错误,可以通过返回一个新的 observable 或者抛出错误对象来处理。 |
|
public |
combineAll(project: function): Observable 通过等待外部 Observable 完成然后应用 combineLatest ,将高阶 Observable 转化为一阶 Observable。 |
|
public |
combineLatest(other: ObservableInput, project: function): Observable 组合多个 Observables 来创建一个 Observable ,该 Observable 的值根据每个输入 Observable 的最新值计算得出的。 |
|
public |
concat(other: ObservableInput, scheduler: Scheduler): Observable 创建一个输出 Observable,它在当前 Observable 之后顺序地发出每个给定的输入 Observable 中的所有值。 |
|
public |
通过顺序地连接内部 Observable,将高阶 Observable 转化为一阶 Observable 。 |
|
public |
concatMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable 将源值投射为一个合并到输出 Observable 的 Observable,以串行的方式等待前一个完成再合并下一个 Observable。 |
|
public |
concatMapTo(innerObservable: ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable 将每个源值投射成同一个 Observable ,该 Observable 会以串行的方式多次合并到输出 Observable 中 。 |
|
public |
count(predicate: function(value: T, i: number, source: Observable<T>): boolean): Observable 计算源的发送数量,并当源完成时发出该数值。 |
|
public |
debounce(durationSelector: function(value: T): SubscribableOrPromise): Observable 只有在另一个 Observable 决定的一段特定时间经过后并且没有发出另一个源值之后,才从源 Observable 中发出一个值。 |
|
public |
debounceTime(dueTime: number, scheduler: Scheduler): Observable 只有在特定的一段时间经过后并且没有发出另一个源值,才从源 Observable 中发出一个值。 |
|
public |
defaultIfEmpty(defaultValue: any): Observable 如果源 Observable 在完成之前没有发出任何 next 值,则发出给定的值,否则返回 Observable 的镜像。 |
|
public |
delay(delay: number | Date, scheduler: Scheduler): Observable 通过给定的超时或者直到一个给定的时间来延迟源 Observable 的发送。 |
|
public |
delayWhen(delayDurationSelector: function(value: T): Observable, subscriptionDelay: Observable): Observable 在给定的时间范围内,延迟源 Observable 所有数据项的发送,该时间段由另一个 Observable 的发送决定。 |
|
public |
将 Notification 对象的 Observable 转换成它们所代表的发送。 |
|
public |
distinct(keySelector: function, flushes: Observable): Observable 返回 Observable,它发出由源 Observable 所发出的所有与之前的项都不相同的项。 |
|
public |
distinctUntilChanged(compare: function): Observable 返回 Observable,它发出源 Observable 发出的所有与前一项不相同的项。 |
|
public |
distinctUntilKeyChanged(key: string, compare: function): Observable 返回 Observable,它发出源 Observable 发出的所有与前一项不相同的项,使用通过提供的 key 访问到的属性来检查两个项是否不同。 |
|
public |
为源 Observable 上的每次发送执行副作用,但返回的 Observable 与源 Observable 是相同的。 |
|
public |
elementAt(index: number, defaultValue: T): Observable 只发出单个值,这个值位于源 Observable 的发送序列中的指定 |
|
public |
every(predicate: function, thisArg: any): Observable 返回的 Observable 发出是否源 Observable 的每项都满足指定的条件。 |
|
public |
当前一个内部 Observable 还未完成的情况下,通过丢弃内部 Observable 使得 高阶 Observable 转换成一阶 Observable。 |
|
public |
exhaustMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable 将每个源值投射成 Observable,只有当前一个投射的 Observable 已经完成, 这个 Observable 才会被合并到输出 Observable 中。 |
|
public |
expand(project: function(value: T, index: number), concurrent: number): Observable 递归地将每个源值投射成 Observable,这个 Observable 会被合并到输出 Observable 中。 |
|
public |
filter(predicate: function(value: T, index: number): boolean, thisArg: any): Observable 通过只发送源 Observable 的中满足指定 predicate 函数的项来进行过滤。 |
|
public |
find(predicate: function(value: T, index: number, source: Observable<T>): boolean, thisArg: any): Observable<T> 只发出源 Observable 所发出的值中第一个满足条件的值。 |
|
public |
findIndex(predicate: function(value: T, index: number, source: Observable<T>): boolean, thisArg: any): Observable<T> 只发出源 Observable 所发出的值中第一个满足条件的值的索引。 |
|
public |
first(predicate: function(value: T, index: number, source: Observable<T>): boolean, resultSelector: function(value: T, index: number): R, defaultValue: R): Observable<T | R> 只发出由源 Observable 所发出的值中第一个(或第一个满足条件的值)。 |
|
public |
|
|
public |
groupBy(keySelector: function(value: T): K, elementSelector: function(value: T): R, durationSelector: function(grouped: GroupedObservable<K, R>): Observable<any>): Observable<GroupedObservable<K, R>> 根据指定条件将源 Observable 发出的值进行分组,并将这些分组作为 |
|
public |
忽略源 Observable 所发送的所有项,只传递 |
|
public |
如果源 Observable 是空的话,它返回一个发出 true 的 Observable,否则发出 false 。 |
|
public |
last(predicate: function): Observable 返回的 Observable 只发出由源 Observable 发出的最后一个值。它可以接收一个可选的 predicate 函数作为 参数,如果传入 predicate 的话则发送的不是源 Observable 的最后一项,而是发出源 Observable 中 满足 predicate 函数的最后一项。 |
|
public |
letProto(func: *): Observable<R> |
|
public |
lift(operator: Operator): Observable 创建一个新的 Observable,以它作为源,并传递操作符的定义作为新的 observable 操作符。 |
|
public |
map(project: function(value: T, index: number): R, thisArg: any): Observable<R> 将给定的 |
|
public |
mapTo(value: any): Observable 每次源 Observble 发出值时,都在输出 Observable 上发出给定的常量值。 |
|
public |
materialize(): Observable<Notification<T>> 表示源 Observable 中的所有通知,每个通知都会在 Notification 对象中标记为
它们原始的通知类型,并会作为输出 Observable 的 |
|
public |
max(comparer: Function): Observable
|
|
public |
merge(other: ObservableInput, concurrent: number, scheduler: Scheduler): Observable 创建一个输出 Observable ,它可以同时发出每个给定的输入 Observable 中的所有值。 |
|
public |
mergeAll(concurrent: number): Observable 将高阶 Observable 转换成一阶 Observable ,一阶 Observable 会同时发出在内部 Observables 上发出的所有值。 |
|
public |
mergeMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable 将每个源值投射成 Observable ,该 Observable 会合并到输出 Observable 中。 |
|
public |
mergeMapTo(innerObservable: ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable 将每个源值投射成同一个 Observable ,该 Observable 会多次合并到输出 Observable 中。 |
|
public |
mergeScan(accumulator: function(acc: R, value: T): Observable<R>, seed: *, concurrent: number): Observable<R> 在源 Observable 上应用 accumulator 函数,其中 accumulator 函数本身返回 Observable ,然后每个返回的中间 Observable 会被合并到输出 Observable 中。 |
|
public |
min(comparer: Function): Observable<R>
|
|
public |
multicast(subjectOrSubjectFactory: Function | Subject, selector: Function): Observable 返回的 Observable 发出对 ConnectableObservable 发出的项调用一个指定的 selector 函数的结果, ConnectableObservable 可以在潜在的多个流之间共享单个 subscription 。 |
|
public |
observeOn(scheduler: IScheduler, delay: number): Observable<T> 使用指定的调度器来重新发出源 Observable 的所有通知。 |
|
public |
onErrorResumeNext(observables: ...ObservableInput): Observable 当任何提供的 Observable 发出完成或错误通知时,它会立即地订阅已传入下一个 Observable 。 |
|
public |
pairwise(): Observable<Array<T>> 将一系列连续的发送成对的组合在一起,并将这些分组作为两个值的数组发出。 |
|
public |
partition(predicate: function(value: T, index: number): boolean, thisArg: any): [Observable<T>, Observable<T>] 将源 Observable 一分为二,一个是所有满足 predicate 函数的值,另一个是所有 不满足 predicate 的值。 |
|
public |
pluck(properties: ...string): Observable 将每个源值(对象)映射成它指定的嵌套属性。 |
|
public |
返回 ConnectableObservable,它是 Observable 的变种,它会一直等待,直到 connnect 方法被调用才会开始把值发送给那些订阅它的观察者。 |
|
public |
publishBehavior(value: *): ConnectableObservable<T> |
|
public |
|
|
public |
publishReplay(bufferSize: *, windowTime: *, scheduler: *): ConnectableObservable<T> |
|
public |
race(): Observable 返回 Observable,该 Observable 是源 Observable 和提供的 Observables 的组合中 第一个发出项的 Observable 的镜像。 |
|
public |
reduce(accumulator: function(acc: R, value: T, index: number): R, seed: R): Observable<R> 在源 Observalbe 上应用 accumulator (累加器) 函数,然后当源 Observable 完成时,返回 累加的结果,可以提供一个可选的 seed 值。 |
|
public |
repeat(count: number): Observable 返回的 Observable 重复由源 Observable 所发出的项的流,最多可以重复 count 次。 |
|
public |
repeatWhen(notifier: function(notifications: Observable): Observable): Observable 返回的 Observalbe 是源 Observable 的镜像,除了 |
|
public |
retry(count: number): Observable 返回一个 Observable, 该 Observable 是源 Observable 不包含错误异常的镜像。 如果源 Observable 发生错误, 这个方法不会传播错误而是会不 断的重新订阅源 Observable 直到达到最大重试次数 (由数字参数指定)。 |
|
public |
retryWhen(notifier: function(errors: Observable): Observable): Observable 返回一个 Observable, 该 Observable 是源 Observable 不包含错误异常的镜像。 如果源头 Observable 触发
|
|
public |
sample(notifier: Observable<any>): Observable<T> 发出源 Observable 最新发出的值当另一个 |
|
public |
sampleTime(period: number, scheduler: Scheduler): Observable<T> 在周期时间间隔内发出源 Observable 发出的最新值。 |
|
public |
scan(accumulator: function(acc: R, value: T, index: number): R, seed: T | R): Observable<R> 对源 Observable 使用累加器函数, 返回生成的中间值, 可选的初始值。 |
|
public |
sequenceEqual(compareTo: Observable, comparor: function): Observable 使用可选的比较函数,按顺序比较两个 Observables 的所有值,然后返回单个布尔值的 Observable, 以表示两个序列是否相等。 按顺序检查两个 Observables 所发出的所有值是否相等。 |
|
public |
share(): Observable<T> 返回一个新的 Observable,该 Observable 多播(共享)源 Observable。 至少要有一个订阅者,该 Observable 才会被订阅并发出数据。
当所有的订阅者都取消订阅了,它会取消对源 Observable 的订阅。 因为 Observable 是多路传播的它使得流是 |
|
public |
shareReplay(bufferSize: *, windowTime: *, scheduler: *): * |
|
public |
single(predicate: Function): Observable<T> 该 Observable 发出源 Observable 所发出的值中匹配指定 predicate 函数的单个项。 如果源 Observable 发出多于1个数据项或者没有发出数据项, 分别以 IllegalArgumentException 和 NoSuchElementException 进行通知。 |
|
public |
skip(count: Number): Observable 返回一个 Observable, 该 Observable 跳过源 Observable 发出的前N个值(N = count)。 |
|
public |
skipLast(count: number): Observable<T> 跳过源 Observable 最后发出的的N个值 (N = count)。 |
|
public |
skipUntil(notifier: Observable): Observable<T> 返回一个 Observable,该 Observable 会跳过源 Observable 发出的值直到第二个 Observable 开始发送。 |
|
public |
skipWhile(predicate: Function): Observable<T> 返回一个 Observable, 该 Observable 会跳过由源 Observable 发出的所有满足指定条件的数据项, 但是一旦出现了不满足条件的项,则发出在此之后的所有项。 |
|
public |
startWith(values: ...T, scheduler: Scheduler): Observable 返回的 Observable 会先发出作为参数指定的项,然后再发出由源 Observable 所发出的项。 |
|
public |
调用 Observable 的执行并注册 Observer 的处理器以便于发出通知。 |
|
public |
subscribeOn(scheduler: Scheduler): Observable<T> 使用指定的 IScheduler 异步地订阅此 Observable 的观察者。 |
|
public |
switch(): Observable<T> 通过只订阅最新发出的内部 Observable ,将高阶 Observable 转换成一阶 Observable 。 |
|
public |
switchMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable 将每个源值投射成 Observable,该 Observable 会合并到输出 Observable 中, 并且只发出最新投射的 Observable 中的值。 |
|
public |
switchMapTo(innerObservable: ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable 将每个源值投射成同一个 Observable ,该 Observable 会使用 switch 多次被打平 到输出 Observable 中。 |
|
public |
take(count: number): Observable<T> 只发出源 Observable 最初发出的的N个值 (N = |
|
public |
takeLast(count: number): Observable<T> 只发出源 Observable 最后发出的的N个值 (N = |
|
public |
takeUntil(notifier: Observable): Observable<T> 发出源 Observable 发出的值,直到 |
|
public |
takeWhile(predicate: function(value: T, index: number): boolean): Observable<T> 发出在源 Observable 中满足 |
|
public |
throttle(durationSelector: function(value: T): SubscribableOrPromise, config: Object): Observable<T> 从源 Observable 中发出一个值,然后在由另一个 Observable 决定的期间内忽略 随后发出的源值,然后重复此过程。 |
|
public |
throttleTime(duration: number, scheduler: Scheduler): Observable<T> 从源 Observable 中发出一个值,然后在 |
|
public |
timeInterval(scheduler: *): Observable<TimeInterval<any>> | WebSocketSubject<T> | Observable<T> |
|
public |
timeout(due: number, scheduler: Scheduler): Observable<R> | WebSocketSubject<T> | Observable<T> |
|
public |
timeoutWith(due: *, withObservable: *, scheduler: *): Observable<R> | WebSocketSubject<T> | Observable<T> |
|
public |
timestamp(scheduler: *): Observable<Timestamp<any>> | WebSocketSubject<T> | Observable<T> |
|
public |
toArray(): Observable<any[]> | WebSocketSubject<T> | Observable<T> |
|
public |
将 Observable 序列转换为符合 ES2015 标准的 Promise 。 |
|
public |
window(windowBoundaries: Observable<any>): Observable<Observable<T>> 每当 |
|
public |
windowCount(windowSize: number, startWindowEvery: number): Observable<Observable<T>> 将源 Observable 的值分支成多个嵌套的 Observable ,每个嵌套的 Observable 最多发出 windowSize 个值。 |
|
public |
windowToggle(openings: Observable<O>, closingSelector: function(value: O): Observable): Observable<Observable<T>> 将源 Observable 的值分支成嵌套的 Observable,分支策略是以 openings 发出项为起始,以 closingSelector 发出为结束。 |
|
public |
windowWhen(closingSelector: function(): Observable): Observable<Observable<T>> 将源 Observable 的值分支成嵌套的 Observable ,通过使用关闭 Observable 的工厂函数来决定何时开启新的窗口。 |
|
public |
withLatestFrom(other: ObservableInput, project: Function): Observable 结合源 Observable 和另外的 Observables 以创建新的 Observable, 该 Observable 的值由每 个 Observable 最新的值计算得出,当且仅当源发出的时候。 |
|
public |
zipAll(project: *): Observable<R> | WebSocketSubject<T> | Observable<T> |
|
public |
zipProto(observables: *): Observable<R> |
Static Public Methods
public static bindCallback(func: function, selector: function, scheduler: Scheduler): function(...params: *): Observable source
把回调API转化为返回Observable的函数。
给它一个签名为f(x, callback)
的函数f,返回一个函数g,
调用'g(x)'的时候会返回一个 Observable。
bindCallback
并不是一个操作符,因为它的输入和输出并不是 Observable 。输入的是一个
带有多个参数的函数,并且该函数的最后一个参数必须是个回调函数,当该函数执行完之后会调用回调函数。
bindCallback
的输出是一个函数,该函数接受的参数和输入函数一样(除了没有最后一个回调函
数)。当输出函数被调用,会返回一个 Observable 。如果输入函数给回调函数传递一个值,则该 Observable
会发出这个值。如果输入函数给回调函数传递多个值,则该 Observable 会发出一个包含所有值的数组。
很重要的一点是,输出函数返回的 Observable 被订阅之前,输入函数是不会执行的。这意味着如果输入 函数发起 AJAX 请求,那么该请求在每次订阅返回的 Observable 之后才会发出,而不是之前。
作为一个可选项,selector 函数可以传给bindObservable
。该函数接受和回调一样的参数。返回 Observable
发出的值,而不是回调参数本身,即使在默认情况下,传递给回调的多个参数将在流中显示为数组。选择器
函数直接用参数调用,就像回调一样。这意味着你可以想象默认选择器(当没有显示提供的时候)是这样
一个函数:将它的所有参数聚集到数组中,或者仅仅返回第一个参数(当只有一个参数的时候)。
最后一个可选参数 - Scheduler - 当 Observable 被订阅的时候,可以用来控制调用输入函
数以及发出结果的时机。默认订阅 Observable 后调用输入函数是同步的,但是使用Scheduler.async
作为最后一个参数将会延迟输入函数的调用,就像是用0毫秒的setTimeout包装过。所以如果你使用了异
步调度器并且订阅了 Observable ,当前正在执行的所有函数调用,将在调用“输入函数”之前结束。
当涉及到传递给回调的结果时,默认情况下当输入函数调用回调之后会立马发出,特别是如果回调也是同步调动的话,
那么 Observable 的订阅也会同步调用next
方法。如果你想延迟调用,使用Scheduler.async
。
这意味着通过使用Scheduler.async
,你可以确保输入函数永远异步调用回调函数,从而避免了可怕的Zalgo。
需要注意的是,输出函数返回的Observable只能发出一次然后完成。即使输入函数多次调用回调函数,第二次 以及之后的调用都不会出现在流中。如果你需要监听多次的调用,你大概需要使用fromEvent或者 fromEventPattern来代替。
如果输入函数依赖上下文(this),该上下文将被设置为输出函数在调用时的同一上下文。特别是如果输入函数 被当作是某个对象的方法进行调用,为了保持同样的行为,建议将输出函数的上下文设置为该对象,输入方法不 是已经绑定好的。
如果输入函数以 node 的方式(第一个参数是可选的错误参数用来标示调用是否成功)调用回调函数,bindNodeCallback
提供了方便的错误处理,也许是更好的选择。 bindCallback
不会区别对待这些方法,错误参数(是否传递)
被解释成正常的参数。
Example:
// 假设我们有这个方法:jQuery.getJSON('/my/url', callback)
var getJSONAsObservable = Rx.Observable.bindCallback(jQuery.getJSON);
var result = getJSONAsObservable('/my/url');
result.subscribe(x => console.log(x), e => console.error(e));
someFunction((a, b, c) => {
console.log(a); // 5
console.log(b); // 'some string'
console.log(c); // {someProperty: 'someValue'}
});
const boundSomeFunction = Rx.Observable.bindCallback(someFunction);
boundSomeFunction().subscribe(values => {
console.log(values) // [5, 'some string', {someProperty: 'someValue'}]
});
someFunction((a, b, c) => {
console.log(a); // 'a'
console.log(b); // 'b'
console.log(c); // 'c'
});
const boundSomeFunction = Rx.Observable.bindCallback(someFunction, (a, b, c) => a + b + c);
boundSomeFunction().subscribe(value => {
console.log(value) // 'abc'
});
function iCallMyCallbackSynchronously(cb) {
cb();
}
const boundSyncFn = Rx.Observable.bindCallback(iCallMyCallbackSynchronously);
const boundAsyncFn = Rx.Observable.bindCallback(iCallMyCallbackSynchronously, null, Rx.Scheduler.async);
boundSyncFn().subscribe(() => console.log('I was sync!'));
boundAsyncFn().subscribe(() => console.log('I was async!'));
console.log('This happened...');
// Logs:
// I was sync!
// This happened...
// I was async!
const boundMethod = Rx.Observable.bindCallback(someObject.methodWithCallback);
boundMethod.call(someObject) // 确保methodWithCallback可以访问someObject
.subscribe(subscriber);
Test:
public static bindNodeCallback(func: function, selector: function, scheduler: Scheduler): * source
把 Node.js 式回调API转换为返回 Observable 的函数。
就像是 bindCallback, 但是回调函数必须形如
callback(error, result)
这样
bindNodeCallback
并不是一个操作符,因为它的输入和输出并不是 Observable。输入的是一个
带有多个参数的函数,并且该函数的最后一个参数必须是个回调函数,当该函数执行完之后会掉
用回调函数,回调函数被要求遵循 Node.js 公约,其中第一个参数是错误对象,标示调用是否成功。
如果这个错误对象被传递给了回调函数,这意味着调用出现了错误。
bindNodeCallback
的输出是一个函数,该函数接受的参数和输入函数一样(除了没有最后一个回调函
数)。当输出函数被调用,会返回一个 Observable 。如果输入函数带着错误对象调用回调函数,Observable
也会用这个错误对象触发错误状态。如果错误对象没有被传递,Observable 会发出第二个参数。
如果输入函数给回调函数传递三个或者更多的值,该 Observable 会发出一个包含除了第一个错误参
数的所有值的数组。
bindNodeCallback
接受可选的选择器函数,它允许 Observable 发出由选择器计算的值,而
不是普通的回调参数。这和bindCallback的选择器效果类似,但是 node 式的错误参数永远
不会传递给该函数。
注意,输入函数永远不会被调用直到输出函数返回的 Observable 被订阅。默认情况下,订阅后会同步调用 输入方法, 但是这可以被改变,通过使用Scheduler作为可选的第三个参数。 调度器可以控制 Observable 何时发出数据。想要获取更多信息,请查看bindCallback的文档, 工作原理完全一样。
和bindCallback一样,输入函数的上下文(this)将会被设置给输出函数的上下文,当它被调用 的时候。当 Observable 发出了数据后,它会立马完成。这意味着即使输入函数再次调用回调函数,第二次以 及后续调用的值永远不会出现在流中。如果你需要处理多次调用,查看fromEvent或者fromEventPattern 来替代。
注意,bindNodeCallback
同样可以用在非 Node.js 环境中,Node.js 式回调函数仅仅是一种公约,所以
如果你的目标环境是浏览器或者其他,并且你使用的API遵守了这种回调公约,bindNodeCallback
就可以
安全的使用那些API函数。
牢记,传递给回调的错误对象并不是 JavaScript 内置的 Error 的实例。事实上,它甚至可以不是对象。
回调函数的错误参数被解读为“存在”,当该参数有值的时候。它可以是,例如,非0数字,非空字符串,逻辑
是。在所有这些情况下,都会触发 Observable 的错误状态。这意味着当使用bindNodeCallback
的时候通常形式的回调函数都会触发失败。如果你的 Observable 经常发生你预料之外的错误,请检查下
回调函数是否是 node.js 式的回调,如果不是,请使用bindCallback替代。
注意,即使错误参数出现在回调函数中,但是它的值是假值,它仍然不会出现在Observable的发出数组或者选择器中。
Return:
* | {function(...params: *): 一个返回 Observable 的函数,该 Observable 发出 node.js 式回调函数返回的数据。 |
Example:
import * as fs from 'fs';
var readFileAsObservable = Rx.Observable.bindNodeCallback(fs.readFile);
var result = readFileAsObservable('./roadNames.txt', 'utf8');
result.subscribe(x => console.log(x), e => console.error(e));
someFunction((err, a, b) => {
console.log(err); // null
console.log(a); // 5
console.log(b); // "some string"
});
var boundSomeFunction = Rx.Observable.bindNodeCallback(someFunction);
boundSomeFunction()
.subscribe(value => {
console.log(value); // [5, "some string"]
});
someFunction((err, a, b) => {
console.log(err); // undefined
console.log(a); // "abc"
console.log(b); // "DEF"
});
var boundSomeFunction = Rx.Observable.bindNodeCallback(someFunction, (a, b) => a + b);
boundSomeFunction()
.subscribe(value => {
console.log(value); // "abcDEF"
});
someFunction(a => {
console.log(a); // 5
});
var boundSomeFunction = Rx.Observable.bindNodeCallback(someFunction);
boundSomeFunction()
.subscribe(
value => {} // never gets called
err => console.log(err) // 5
);
public static combineLatest(observable1: ObservableInput, observable2: ObservableInput, project: function, scheduler: Scheduler): Observable source
组合多个 Observables 来创建一个 Observable ,该 Observable 的值根据每个输入 Observable 的最新值计算得出的。
它将使用所有输入中的最新值计算公式,然后发出该公式的输出。
combineLatest
结合所有输入 Observable 参数的值. 顺序订阅每个 Observable,
每当任一输入 Observable 发出,收集每个输入 Observable 的最新值组成一个数组。所以,当你给操作符
传入 n 个 Observable,返回的 Observable 总是会发出一个长度为 n 的数组,对应输入 Observable
的顺序(第一个 Observable 的值放到数组的第一个)。
静态版本的 combineLatest
接受一个 Observables 数组或者每个 Observable 可以直接作为参数。
请注意,如果你事先不知道你将要结合多少个 Observable, 那么 Observables 数组是一个好的选择。
传递空的数组将会导致返回 Observable 立马完成。
为了保证输出数组的长度相同,combineLatest
实际上会等待所有的输入 Observable 至少发出一次,
在返回 Observable 发出之前。这意味着如果某个输入 Observable 在其余的输入 Observable 之前发出,它所发出
的值只保留最新的。另一方面,如果某个输入 Observable 没有发出值就完成了,返回 Observable 也不会发
射值并立马完成,因为不可能从已经完成的 Observable 中收集到值。同样的,如果某个输入 Observable
不发出值也不完成,combineLatest
会永远不发出值也不结束。所以,再次强调下,它会等待所有的流
去发出值。
如果给combineLatest
至少传递一个输入 Observable 并且所有传入的输入 Observable 都发出了值,返回
Observable 将会在所有结合流完成后完成。所以即使某些 Observable 完成了,当其他输入 Observable
发出值的时候,combineLatest返回 Observable 仍然会发出值。对于完成的输入 Observable,它
的值一直是最后发出的值。另一方面,如果任一输入 Observable 发生错误,combineLatest
也会
立马触发错误状态,所有的其他输入 Observable 都会被解除订阅。
combineLatest
接受一个可选的参数投射函数,它接受返回 Observable 发出的值。投射函数
可以返回任何数据,这些数据代替默认的数组被返回 Observable 发出。需要注意的是,投射函数并不接
受值的数组,而是值本身。这意味着默认的投射函数就是一个接受所有参数并把它们放到一个数组里面的
函数。
Params:
Name | Type | Attribute | Description |
observable1 | ObservableInput | 用来和其他 Observables 进行结合的输入 Observable 。 |
|
observable2 | ObservableInput | 用来和其他 Observables 进行结合的输入 Observable 。 可以有多个输入Observables传入或者第一个参数是Observables数组 |
|
project | function |
|
投射成输出 Observable 上的一个新的值。 |
scheduler | Scheduler |
|
用来订阅每个输入 Observable 的调度器。 |
Example:
const firstTimer = Rx.Observable.timer(0, 1000); // 从现在开始,每隔1秒发出0, 1, 2...
const secondTimer = Rx.Observable.timer(500, 1000); // 0.5秒后,每隔1秒发出0, 1, 2...
const combinedTimers = Rx.Observable.combineLatest(firstTimer, secondTimer);
combinedTimers.subscribe(value => console.log(value));
// Logs
// [0, 0] after 0.5s
// [1, 0] after 1s
// [1, 1] after 1.5s
// [2, 1] after 2s
const observables = [1, 5, 10].map(
n => Rx.Observable.of(n).delay(n * 1000).startWith(0) // 先发出0,然后在 n 秒后发出 n。
);
const combined = Rx.Observable.combineLatest(observables);
combined.subscribe(value => console.log(value));
// 日志
// [0, 0, 0] 立刻
// [1, 0, 0] 1s 后
// [1, 5, 0] 5s 后
// [1, 5, 10] 10s 后
var weight = Rx.Observable.of(70, 72, 76, 79, 75);
var height = Rx.Observable.of(1.76, 1.77, 1.78);
var bmi = Rx.Observable.combineLatest(weight, height, (w, h) => w / (h * h));
bmi.subscribe(x => console.log('BMI is ' + x));
// 控制台输出:
// BMI is 24.212293388429753
// BMI is 23.93948099205209
// BMI is 23.671253629592222
public static concat(input1: ObservableInput, input2: ObservableInput, scheduler: Scheduler): Observable source
创建一个输出 Observable,该 Observable 顺序的发出每个输入 Observable 的所有值。
连接多个输入 Observable,顺序的发出它们的值,一个 Observable 接一个 Observable。
concat
通过一次订阅一个将多个 Observables 连接起来,并将值合并到输出 Observable 中。
你可以传递一个输入 Observable 数组,或者直接把它们当做参数传递。 传递一个空数组会
导致输出 Observable 立马触发完成状态。
concat
会订阅第一个输入 Observable 并且发出它的所有值, 不去做任何干预。 当这个
输入 Observable 完成时, 订阅第二个输入 Observable,同样的发出它的所有值。这个过
程会不断重复直到输入 Observable 都用过了。当最后一个输入 Observable 完成时,concat
也会完成。 任何时刻都只会有一个输入 Observable 发出值。 如果你想让所有的输入 Observable
并行发出数据,请查看merge, 特别的带上concurrent
参数。 事实上,concat
和
concurrent
设置为1的merge
效果是一样的。
注意,如果输入 Observable 一直都不完成, concat
也会一直不能完成并且下一个输入 Observable
将永远不能被订阅. 另一方面, 如果某个输入 Observable 在它被订阅后立马处于完成状态, 那么它对
concat
是不可见的, 仅仅会转向下一个输入 Observable.
如果输入 Observable 链中的任一成员发生错误, concat
会立马触发错误状态,而不去控制下一个输入
Observable. 发生错误的输入 Observable 之后的输入 Observable 不会被订阅.
如果你将同一输入 Observable 传递给concat
多次,结果流会在每次订阅的时候“重复播放”, 这意味着
你可以重复 Observable 多次. 如果你乏味的给concat
传递同一输入 Observable 1000次,你可以试着
用用repeat.
Params:
Name | Type | Attribute | Description |
input1 | ObservableInput | 等待被连接的输入 Observable。 |
|
input2 | ObservableInput | 等待被连接的输入 Observable。 可以传递多个输入Observable. |
|
scheduler | Scheduler |
|
可选的调度器,调度每个 Observable 的订阅。 |
Example:
var timer = Rx.Observable.interval(1000).take(4);
var sequence = Rx.Observable.range(1, 10);
var result = Rx.Observable.concat(timer, sequence);
result.subscribe(x => console.log(x));
// results in:
// 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3 -immediate-> 1 ... 10
var timer1 = Rx.Observable.interval(1000).take(10);
var timer2 = Rx.Observable.interval(2000).take(6);
var timer3 = Rx.Observable.interval(500).take(10);
var result = Rx.Observable.concat([timer1, timer2, timer3]); // note that array is passed
result.subscribe(x => console.log(x));
// results in the following:
// (Prints to console sequentially)
// -1000ms-> 0 -1000ms-> 1 -1000ms-> ... 9
// -2000ms-> 0 -2000ms-> 1 -2000ms-> ... 5
// -500ms-> 0 -500ms-> 1 -500ms-> ... 9
const timer = Rx.Observable.interval(1000).take(2);
Rx.Observable.concat(timer, timer) // concating the same Observable!
.subscribe(
value => console.log(value),
err => {},
() => console.log('...and it is done!')
);
// Logs:
// 0 after 1s
// 1 after 2s
// 0 after 3s
// 1 after 4s
// "...and it is done!" also after 4s
public static create(onSubscription: function(observer: Observer): TeardownLogic): Observable source
创建一个新的 Observable ,当观察者( Observer )订阅该 Observable 时,它会执行指定的函数。
创建自定义的 Observable ,它可以做任何你想做的事情
create
将 onSubscription
函数转化为一个实际的 Observable 。每当有人订阅该 Observable 的
时候,onSubscription
函数会接收 Observer 实例作为唯一参数执行。onSubscription
应该
调用观察者对象的 next
, error
和 complete
方法。
带值调用 next
会将该值发出给观察者。调用 complete 意味着该 Observable 结束了发出并且不会做任何事情了。
调用 error
意味着出现了错误,传给 error
的参数应该提供详细的错误信息。
一个格式良好的 Observable 可以通过 next
方法发出任意多个值,但是 complete
和 error
方法只能被调用
一次,并且调用之后不会再调用任何方法。 如果你试图在 Observable 已经完成或者发生错误之后调用next
、 complete
或 error
方法,这些调用将会被忽略,以保护所谓的 Observable 合同。注意,你并不需要一定要在某个时刻
调用 complete
方法,创建一个不会被终止的 Observable 也是完全可以的,一切取决于你的需求。
onSubscription
可以选择性的返回一个函数或者一个拥有 unsubscribe
方法的对象。 当要取消对 Observable
的订阅时,函数或者方法将会被调用,清理所有的资源。比如说,如果你在自己的 Observable 里面使用了
setTimeout
, 当有人要取消订阅的时候, 你可以清理定时器, 这样就可以减少不必要的触发,并且浏览
器(或者其他宿主环境)也不用将计算能力浪费在这种无人监听的定时事件上。
绝大多数情况下你不需要使用 create
,因为现有的操作符创建出来的 Observable 能满足绝大多数使用场景。这也就意味着,
create
是允许你创建任何 Observable 的底层机制,如果你有非常特殊的需求的话,可以使用它。
TypeScript 签名问题
因为 Observable 继承的类已经定义了静态 create
方法,但是签名不同, 不可能给 Observable.create
合适的签名。
正因为如此,给 create
传递的函数将不会进行类型检查,除非你明确指定了特定的签名。
当使用 TypeScript 时,我们建议将传递给 create 的函数签名声明为(observer: Observer) => TeardownLogic
,
其中Observer 和 TeardownLogic 是库提供的接口。
Params:
Name | Type | Attribute | Description |
onSubscription | function(observer: Observer): TeardownLogic | 该函数接受一个观察者,
然后在适当的时机调用观察者的 |
Example:
var observable = Rx.Observable.create(function (observer) {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
});
observable.subscribe(
value => console.log(value),
err => {},
() => console.log('this is the end')
);
// 日志:
// 1
// 2
// 3
// "this is the end"
const observable = Rx.Observable.create((observer) => {
observer.error('something went really wrong...');
});
observable.subscribe(
value => console.log(value), // 永远不会被调用
err => console.log(err),
() => console.log('complete') // 永远不会被调用
);
// 日志:
// "something went really wrong..."
const observable = Rx.Observable.create(observer => {
const id = setTimeout(() => observer.next('...'), 5000); // 5s后发出数据
return () => { clearTimeout(id); console.log('cleared!'); };
});
const subscription = observable.subscribe(value => console.log(value));
setTimeout(() => subscription.unsubscribe(), 3000); // 3s后取消订阅
// 日志:
// "cleared!" after 3s
// Never logs "..."
public static defer(observableFactory: function(): SubscribableOrPromise): Observable source
创建一个 Observable,当被订阅的时候,调用 Observable 工厂为每个观察者创建新的 Observable。
惰性创建 Observable, 也就是说, 当且仅当它被订阅的时候才创建。
defer
允许你创建一个 Observable 当且仅当它被订阅的时候,并且为每个订阅者创建新的 Observable。
它一直在等待直到观察者订阅了它, 然后它创建一个新的 Observable,通常会以 Observable 工厂函数的方式。
对每个订阅者它都是新的, 所以即使每个订阅者也许会认为它们订阅的是同一个 Observable, 事实上每个订阅
者获得的是只属于它们的 Observable。
Params:
Name | Type | Attribute | Description |
observableFactory | function(): SubscribableOrPromise | Observable 的工 厂函数,它会在每个 Observer 订阅 Observable 的时候被触发调用. 也可以返回一个 Promise, Promise 将会立刻被转 化为 Observable。 |
Example:
var clicksOrInterval = Rx.Observable.defer(function () {
if (Math.random() > 0.5) {
return Rx.Observable.fromEvent(document, 'click');
} else {
return Rx.Observable.interval(1000);
}
});
clicksOrInterval.subscribe(x => console.log(x));
// 结果如下:
// 如果Math.random()返回的值大于0.5,它会监听"document"上的点击事件; 当document
// 被点击,它会将点击事件对象打印到控制台。 如果结果小于0.5它会每秒发出一个从0开始自增数。
Test:
See:
public static empty(scheduler: Scheduler): Observable source
创建一个什么数据都不发出并且立马完成的 Observable。
仅仅发出 complete 通知,其他什么也不做。
这个静态操作符对于创建一个简单的只发出完成状态通知的 Observable 是非常有用的。 它可以被用来和 其他 Observables 进行组合, 比如在 mergeMap 中使用。
Params:
Name | Type | Attribute | Description |
scheduler | Scheduler |
|
调度器 ( IScheduler ), 用来调度完成通知。 |
Example:
var result = Rx.Observable.empty().startWith(7);
result.subscribe(x => console.log(x));
var interval = Rx.Observable.interval(1000);
var result = interval.mergeMap(x =>
x % 2 === 1 ? Rx.Observable.of('a', 'b', 'c') : Rx.Observable.empty()
);
result.subscribe(x => console.log(x));
// 结果如下:
// x 是间隔的计数比如:0,1,2,3,...
// x 1000ms 出现一次
// 如果 x % 2 等于 1 打印 abc
// 如果 x % 2 不等于1 什么也不输出
Test:
public static forkJoin(sources: *): any source
Params:
Name | Type | Attribute | Description |
sources | * |
Return:
any |
Test:
public static from(ish: ObservableInput<T>, scheduler: Scheduler): Observable<T> source
从一个数组、类数组对象、Promise、迭代器对象或者类 Observable 对象创建一个 Observable.
几乎可以把任何东西都能转化为Observable.
将各种其他对象和数据类型转化为 Observables。 from
将 Promise、类数组对象、迭代器对象
(https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#iterable)
转化为 Observable ,该 Observable 发出 promise、数组或者迭代器的成员。 字符串,在这种上下文里,会被当做字符数组。
类 Observable 对象(包括ES2015里的 Observable 方法)也可以通过此操作符进行转换。
Params:
Name | Type | Attribute | Description |
ish | ObservableInput<T> | 一个可以被订阅的对象, Promise, 类 Observable, 数组, 迭代器或者类数组对象可以被转化。 |
|
scheduler | Scheduler |
|
调度器,用来调度值的发送。 |
Example:
var array = [10, 20, 30];
var result = Rx.Observable.from(array);
result.subscribe(x => console.log(x));
// 结果如下:
// 10 20 30
function* generateDoubles(seed) {
var i = seed;
while (true) {
yield i;
i = 2 * i; // double it
}
}
var iterator = generateDoubles(3);
var result = Rx.Observable.from(iterator).take(10);
result.subscribe(x => console.log(x));
// Results in the following:
// 3 6 12 24 48 96 192 384 768 1536
Test:
public static fromEvent(target: EventTargetLike, eventName: string, options: EventListenerOptions, selector: SelectorMethodSignature<T>): Observable<T> source
创建一个 Observable,该 Observable 发出来自给定事件对象的指定类型事件。
创建一个来自于 DOM 事件,或者 Node 的 EventEmitter 事件或者其他事件的 Observable。
通过给“事件目标”添加事件监听器的方式创建 Observable,可能会是拥有addEventListener
和
removeEventListener
方法的对象,一个 Node.js 的 EventEmitter,一个 jQuery 式的 EventEmitter,
一个 DOM 的节点集合, 或者 DOM 的 HTMLCollection。 当输出 Observable 被订阅的时候事件处理函数会被添加,
当取消订阅的时候会将事件处理函数移除。
Params:
Name | Type | Attribute | Description |
target | EventTargetLike | DOMElement, 事件目标, Node.js EventEmitter, NodeList 或者 HTMLCollection 等附加事件处理方法的对象。 |
|
eventName | string | 感兴趣的事件名称, 被 target 发出。 |
|
options | EventListenerOptions |
|
可选的传递给 addEventListener 的参数。 |
selector | SelectorMethodSignature<T> |
|
可选的函数处理结果. 接收事件处理函数的参数,应该返回单个值。 |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
clicks.subscribe(x => console.log(x));
// 结果:
// 每次点击 document 时,都会在控制台上输出 MouseEvent 。
Test:
See:
public static fromEventPattern(addHandler: function(handler: Function): any, removeHandler: function(handler: Function, signal?: any): void, selector: function(...args: any): T): Observable<T> source
从一个基于 addHandler/removeHandler 方法的API创建 Observable。
将任何 addHandler/removeHandler 的API转化为 Observable。
创建 Observable ,该 Observable 通过使用addHandler
和 removeHandler
添加和删除事件处理器,
使用可选的选择器函数将事件参数转化为结果. addHandler
当输出 Observable 被订阅的时候调用, removeHandler
方法在取消订阅的时候被调用。
Params:
Name | Type | Attribute | Description |
addHandler | function(handler: Function): any | 一个接收处理器的函数,并且将 该处理器添加到事件源。 |
|
removeHandler | function(handler: Function, signal?: any): void |
|
可选的
函数,接受处理器函数做为参数,可以移除处理器当之前使用 |
selector | function(...args: any): T |
|
可选的函数处理结果。 接受事件处理的参数返 回单个的值。 |
Example:
function addClickHandler(handler) {
document.addEventListener('click', handler);
}
function removeClickHandler(handler) {
document.removeEventListener('click', handler);
}
var clicks = Rx.Observable.fromEventPattern(
addClickHandler,
removeClickHandler
);
clicks.subscribe(x => console.log(x));
public static fromPromise(promise: PromiseLike<T>, scheduler: Scheduler): Observable<T> source
将 Promise 转化为 Observable。
返回一个仅仅发出 Promise resolve 过的值然后完成的 Observable。
把 ES2015 的 Promise 或者兼容 Promises/A+ 规范的 Promise 转化为 Observable。 如果 Promise resolves 一个值, 输出 Observable 发出这个值然后完成。 如果 Promise 被 rejected, 输出 Observable 会发出相应的 错误。
Params:
Name | Type | Attribute | Description |
promise | PromiseLike<T> | 被转化的 promise。 |
|
scheduler | Scheduler |
|
可选的调度器,用来调度 resolved 或者 rejection 的值。 |
Example:
var result = Rx.Observable.fromPromise(fetch('http://myserver.com/'));
result.subscribe(x => console.log(x), e => console.error(e));
Test:
See:
public static interval(period: number, scheduler: Scheduler): Observable source
创建一个 Observable ,该 Observable 使用指定的 IScheduler ,并以指定时间间隔发出连续的数字。
定期发出自增的数字。
interval
返回一个发出无限自增的序列整数, 你可以选择固定的时间间隔进行发送。 第一次并
没有立马去发送, 而是第一个时间段过后才发出。 默认情况下, 这个操作符使用 async 调度器来
提供时间的概念,但也可以给它传递任意调度器。
Example:
var numbers = Rx.Observable.interval(1000);
numbers.subscribe(x => console.log(x));
Test:
public static merge(observables: ...ObservableInput, concurrent: number, scheduler: Scheduler): Observable source
创建一个输出 Observable ,它可以同时发出每个给定的输入 Observable 中值。
通过把多个 Observables 的值混合到一个 Observable 中来将其打平。
merge
订阅每个给定的输入 Observable (作为参数),然后只是将所有输入 Observables 的所有值发
送(不进行任何转换)到输出 Observable 。所有的输入 Observable 都完成了,输出 Observable 才
能完成。任何由输入 Observable 发出的错误都会立即在输出 Observalbe 上发出。
Params:
Name | Type | Attribute | Description |
observables | ...ObservableInput | 合并到一起的输入Observables。 |
|
concurrent | number |
|
可以同时订阅的输入 Observables 的最大数量。 |
scheduler | Scheduler |
|
调度器用来管理并行的输入Observables。 |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var timer = Rx.Observable.interval(1000);
var clicksOrTimer = Rx.Observable.merge(clicks, timer);
clicksOrTimer.subscribe(x => console.log(x));
// 结果如下:
// 每隔1s发出一个自增值到控制台
// document被点击的时候MouseEvents会被打印到控制台
// 因为两个流被合并了,所以你当它们发生的时候你就可以看见.
var timer1 = Rx.Observable.interval(1000).take(10);
var timer2 = Rx.Observable.interval(2000).take(6);
var timer3 = Rx.Observable.interval(500).take(10);
var concurrent = 2; // the argument
var merged = Rx.Observable.merge(timer1, timer2, timer3, concurrent);
merged.subscribe(x => console.log(x));
// 结果如下:
// - timer1和timer2将会并行运算
// - timer1每隔1s发出值,迭代10次
// - timer2每隔1s发出值,迭代6次
// - timer1达到迭代最大次数,timer2会继续,timer3开始和timer2并行运行
// - 当timer2达到最大迭代次数就停止,timer3将会继续每隔500ms发出数据直到结束
public static never(): Observable source
创建一个不向观察者发出任何项的 Observable 。
从不发出任何项的 Observable 。
这个静态操作符对于创建既不发出数据也不触发错误和完成通知的 Observable。 可以用来测试或 者和其他 Observables进行组合。 注意,由于不会发送完成通知,这个 Observable 的 subscription 不会被自动地清理。Subscriptions 需要手动清理。
Example:
function info() {
console.log('Will not be called');
}
var result = Rx.Observable.never().startWith(7);
result.subscribe(x => console.log(x), info, info);
Test:
public static of(values: ...T, scheduler: Scheduler): Observable<T> source
创建一个 Observable,它会依次发出由你提供的参数,最后发出完成通知。 发出你提供的参数,然后完成。
这个静态操作符适用于创建简单的 Observable ,该 Observable 只发出给定的参数,
在发送完这些参数后发出完成通知。它可以用来和其他 Observables 组合比如说concat。
默认情况下,它使用null
调度器,这意味着next
通知是同步发出的,
尽管使用不同的调度器可以决定这些通知何时送到。
Params:
Name | Type | Attribute | Description |
values | ...T | 表示 |
|
scheduler | Scheduler |
|
用来调度 next 通知发送的调度器( IScheduler )。 |
Example:
var numbers = Rx.Observable.of(10, 20, 30);
var letters = Rx.Observable.of('a', 'b', 'c');
var interval = Rx.Observable.interval(1000);
var result = numbers.concat(letters).concat(interval);
result.subscribe(x => console.log(x));
Test:
public static range(start: number, count: number, scheduler: Scheduler): Observable source
创建一个 Observable ,它发出指定范围内的数字序列。
发出区间范围内的数字序列。
range
操作符顺序发出一个区间范围内的连续整数, 你可以决定区间的开始和长度。 默认情况下, 不使用
调度器仅仅同步的发送通知, 但是也可以可选的使用可选的调度器来控制发送。
Example:
var numbers = Rx.Observable.range(1, 10);
numbers.subscribe(x => console.log(x));
Test:
public static throw(将具体的: any, scheduler: Scheduler): Observable source
创建一个不发送数据给观察者并且立马发出错误通知的 Observable。
仅仅发出 error 通知,其他什么也不做。
这个静态操作符对于创建简单的只发出错误通知的 Observable 十分有用。 可以被用来和其他 Observables 组合, 比如在 mergeMap 中使用。
Params:
Name | Type | Attribute | Description |
将具体的 | any | Error 传递给错误通知。 |
|
scheduler | Scheduler |
|
调度器IScheduler,用来调度错误通知的发送。 |
Example:
var result = Rx.Observable.throw(new Error('oops!')).startWith(7);
result.subscribe(x => console.log(x), e => console.error(e));
var interval = Rx.Observable.interval(1000);
var result = interval.mergeMap(x =>
x === 13 ?
Rx.Observable.throw('Thirteens are bad') :
Rx.Observable.of('a', 'b', 'c')
);
result.subscribe(x => console.log(x), e => console.error(e));
Test:
public static timer(initialDelay: number | Date, period: number, scheduler: Scheduler): Observable source
创建一个 Observable,该 Observable 在初始延时(initialDelay
)之后开始发送并且在每个时间周期( period
)后发出自增的数字。
就像是interval, 但是你可以指定什么时候开始发送。
timer
返回一个发出无限自增数列的 Observable, 具有一定的时间间隔,这个间隔由你来选择。 第一个发送发生在
初始延时之后. 初始延时就像是Date。 默认情况下, 这个操作符使用 async 调度器来提供时间的概念,
但是你也可以传递任何调度器。 如果时间周期没有被指定, 输出 Observable 只发出0。 否则,会发送一个无限数列。
Example:
var numbers = Rx.Observable.timer(3000, 1000);
numbers.subscribe(x => console.log(x));
var numbers = Rx.Observable.timer(5000);
numbers.subscribe(x => console.log(x));
Test:
public static webSocket(urlConfigOrSource: string | WebSocketSubjectConfig): WebSocketSubject source
包装浏览器提供的兼容w3c的WebSocket对象.
Params:
Name | Type | Attribute | Description |
urlConfigOrSource | string | WebSocketSubjectConfig | the source of the websocket as an url or a structure defining the websocket object |
Example:
let socket$ = Observable.webSocket('ws://localhost:8081');
socket$.subscribe(
(msg) => console.log('message received: ' + msg),
(err) => console.log(err),
() => console.log('complete')
);
socket$.next(JSON.stringify({ op: 'hello' }));
import { w3cwebsocket } from 'websocket';
let socket$ = Observable.webSocket({
url: 'ws://localhost:8081',
WebSocketCtor: w3cwebsocket
});
socket$.subscribe(
(msg) => console.log('message received: ' + msg),
(err) => console.log(err),
() => console.log('complete')
);
socket$.next(JSON.stringify({ op: 'hello' }));
Test:
public static zip(observables: *): Observable<R> source
将多个 Observable 组合以创建一个 Observable,该 Observable 的值是由所有输入 Observables 的值按顺序计算而来的。
如果最后一个参数是函数, 这个函数被用来计算最终发出的值.否则, 返回一个顺序包含所有输入值的数组.
Params:
Name | Type | Attribute | Description |
observables | * |
Example:
let age$ = Observable.of<number>(27, 25, 29);
let name$ = Observable.of<string>('Foo', 'Bar', 'Beer');
let isDev$ = Observable.of<boolean>(true, true, false);
Observable
.zip(age$,
name$,
isDev$,
(age: number, name: string, isDev: boolean) => ({ age, name, isDev }))
.subscribe(x => console.log(x));
// 输出:
// { age: 27, name: 'Foo', isDev: true }
// { age: 25, name: 'Bar', isDev: true }
// { age: 29, name: 'Beer', isDev: false }
Public Constructors
Public Methods
public [Symbol_observable](): Observable source
An interop point defined by the es7-observable spec https://github.com/zenparsing/es-observable
public audit(durationSelector: function(value: T): SubscribableOrPromise): Observable<T> source
在另一个 Observable 决定的时间段里忽略源数据,然后发出源 Observable 中最新发出的值, 然后重复此过程。
就像是auditTime, 但是沉默持续时间段由第二个 Observable 决定。
audit
和 throttle
很像, 但是发出沉默时间窗口的最后一个值, 而不是第一个。只要 audit 的内部时间器被禁用,
它就会在输出 Observable 上发出源 Observable 的最新值,并且当时间器启用时忽略源值。初始时,时间器是禁用的。
只要第一个源值到达,时间器是用源值调用 durationselector 方法启用,返回 "duration" Observable。 当 duration Observable
发出数据或者完成时,时间器禁用,然后输出 Observable 发出最新的源值,并且不断的重复这个过程。
Params:
Name | Type | Attribute | Description |
durationSelector | function(value: T): SubscribableOrPromise | 该函数从源 Observable 中接收值,用于为每个源值计算沉默持续时间,并返回 Observable 或 Promise 。 |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.audit(ev => Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));
public auditTime(duration: number, scheduler: Scheduler): Observable<T> source
duration 毫秒内忽略源值,然后发出源 Observable 的最新值, 并且重复此过程。
当它看见一个源值,它会在接下来的 duration 毫秒内忽略这个值以及接下来的源值,过后发出最新的源值。
auditTime
和 throttleTime
很像, 但是发送沉默时间窗口的最后一个值, 而不是第一个。只要 audit 的内部时间器被禁用,它就会在
输出 Observable 上发出源 Observable 的最新值,并且当定时器启用时忽略源值。初始时,时间器是禁用的。只要第一个值到达, 时间器被启用。度过
持续时间后(或者时间单位由内部可选的参数调度器决定),时间间隔被禁用, 输出 Observable 发出最新的值, 不断的重复这个过程。可选项 IScheduler 用来管理时间器。
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.auditTime(1000);
result.subscribe(x => console.log(x));
public buffer(closingNotifier: Observable<any>): Observable<T[]> source
缓冲源 Observable 的值直到 closingNotifier
发出。
将过往的值收集到一个数组中,并且仅当另一个 Observable 发出通知时才发出此数组。
将 Observable 发出的值缓冲起来直到 closingNotifier
发出数据, 在这个时候在输出
Observable 上发出该缓冲区的值并且内部开启一个新的缓冲区, 等待下一个closingNotifier
的发送。
Params:
Name | Type | Attribute | Description |
closingNotifier | Observable<any> | 该 Observable 向输出 Observale 发出信号以通知发出缓冲区。 |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var interval = Rx.Observable.interval(1000);
var buffered = interval.buffer(clicks);
buffered.subscribe(x => console.log(x));
public bufferCount(bufferSize: number, startBufferEvery: number): Observable<T[]> source
缓冲源 Observable 的值直到缓冲数量到达设定的 bufferSize
.
将过往的值收集到一个数组中,当数组数量到达设定的 bufferSize 时发出该数组。
缓冲源 Observable 的N个值(N = bufferSize),然后发出该缓冲区并进行清理,再然后开启一个新的缓存区,新缓存区会新缓存M个值(M = startBufferEvery)。
如果startBufferEvery
没有提供或者为null
, 新的缓冲会在源开始的时候开启并且在每次发出的时候关闭。
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var buffered = clicks.bufferCount(2);
buffered.subscribe(x => console.log(x));
var clicks = Rx.Observable.fromEvent(document, 'click');
var buffered = clicks.bufferCount(2, 1);
buffered.subscribe(x => console.log(x));
public bufferTime(bufferTimeSpan: number, bufferCreationInterval: number, maxBufferSize: number, scheduler: Scheduler): Observable<T[]> source
在特定时间周期内缓冲源 Observable 的值。
将过往的值收集到数组中,并周期性地发出这些数组。
在一个特定的持续时间bufferTimeSpan
内缓存源 Observable 的值。 除非指定了可选参数bufferCreationInterval
, 它会发出数组并且重置缓冲区每个bufferTimeSpan
毫秒。这个操作符会在每个 bufferCreationInterval 毫秒时开启缓冲区,
并在每个 bufferTimeSpan 毫秒时关闭(发出并重置)缓冲区。如果可选参数maxBufferSize
被指定, 缓冲区会在bufferTimeSpan
毫秒
之后或者缓冲区元素个数达到maxBufferSize
时发出。
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var buffered = clicks.bufferTime(1000);
buffered.subscribe(x => console.log(x));
var clicks = Rx.Observable.fromEvent(document, 'click');
var buffered = clicks.bufferTime(2000, 5000);
buffered.subscribe(x => console.log(x));
See:
- buffer
- bufferCount
- bufferToggle
- bufferWhen
- windowTime
public bufferToggle(openings: SubscribableOrPromise<O>, closingSelector: function(value: O): SubscribableOrPromise): Observable<T[]> source
缓冲源 Observable 的值,openings
发送的时候开始缓冲,closingSelector
发送的时候结束缓冲。
将过往数据收集到数组中. 当opening
发送的时候开始收集, 然后调用closingSelector
函数获取 Observable ,该Observable 告知什么时候关闭缓冲。
缓冲源Observable的值,当openings
Observable发出信号的时候开始缓冲数据, 当closingSelector
返回的Subscribable
或者Promise发送的时候结束并且发送缓冲区.
Params:
Name | Type | Attribute | Description |
openings | SubscribableOrPromise<O> | 开启新缓冲区的通知,可以是 Subscribable 或 Promise 。 |
|
closingSelector | function(value: O): SubscribableOrPromise | 接受 |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var openings = Rx.Observable.interval(1000);
var buffered = clicks.bufferToggle(openings, i =>
i % 2 ? Rx.Observable.interval(500) : Rx.Observable.empty()
);
buffered.subscribe(x => console.log(x));
public bufferWhen(closingSelector: function(): Observable): Observable<T[]> source
缓冲源 Observable 的值, 使用关闭 Observable 的工厂函数来决定何时关闭、发出和重置缓冲区。
将过往的值收集到数组中, 当开始收集数据的时候, 调用函数返回 Observable, 该 Observable 告知何时关闭缓冲区并重新开始收集。
立马开启缓冲区, 然后当closingSelector
函数返回的observable发出数据的时候关闭缓冲区.
当关闭缓冲区的时候, 会立马开启新的缓冲区,并不断重复此过程。
Params:
Name | Type | Attribute | Description |
closingSelector | function(): Observable | 该函数不接受参数,并返回通知缓冲区关闭的 Observable 。 |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var buffered = clicks.bufferWhen(() =>
Rx.Observable.interval(1000 + Math.random() * 4000)
);
buffered.subscribe(x => console.log(x));
public catch(selector: function): Observable source
捕获 observable 中的错误,可以通过返回一个新的 observable 或者抛出错误对象来处理。
Params:
Name | Type | Attribute | Description |
selector | function | 该函数接受 |
Example:
Observable.of(1, 2, 3, 4, 5)
.map(n => {
if (n == 4) {
throw 'four!';
}
return n;
})
.catch(err => Observable.of('I', 'II', 'III', 'IV', 'V'))
.subscribe(x => console.log(x));
// 1, 2, 3, I, II, III, IV, V
Observable.of(1, 2, 3, 4, 5)
.map(n => {
if (n === 4) {
throw 'four!';
}
return n;
})
.catch((err, caught) => caught)
.take(30)
.subscribe(x => console.log(x));
// 1, 2, 3, 1, 2, 3, ...
Observable.of(1, 2, 3, 4, 5)
.map(n => {
if (n == 4) {
throw 'four!';
}
return n;
})
.catch(err => {
throw 'error in source. Details: ' + err;
})
.subscribe(
x => console.log(x),
err => console.log(err)
);
// 1, 2, 3, error in source. Details: four!
public combineAll(project: function): Observable source
通过等待外部 Observable 完成然后应用 combineLatest ,将高阶 Observable 转化为一阶 Observable。
当高阶 Observable 完成时,通过使用 combineLatest 将其打平。
接受一个返回 Observables 的 Observable, 并从中收集所有的 Observables 。 一旦最外部的 Observable 完成, 会订阅所有收集的 Observables 然后通过combineLatest合并值, 这样:
- 每次内部 Observable 发出的时候, 外部 Observable 也发出。
- 当返回的 observable 发出的时候, 它会通过如下方式发出所有最新的值:
- 如果提供了`project`函数, 该函数会按内部 Observable 到达的顺序依次使用每个内部 Observable 的最新值进行调用。
- 如果没有提供`project`函数, 包含所有最新数据的数组会被输出 Observable 发出。
Params:
Name | Type | Attribute | Description |
project | function |
|
它按顺序的从每个收集到的内部 Observable 中接收最新值作为参数。 |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var higherOrder = clicks.map(ev =>
Rx.Observable.interval(Math.random()*2000).take(3)
).take(2);
var result = higherOrder.combineAll();
result.subscribe(x => console.log(x));
public combineLatest(other: ObservableInput, project: function): Observable source
组合多个 Observables 来创建一个 Observable ,该 Observable 的值根据每个输入 Observable 的最新值计算得出的。
它将使用所有输入中的最新值计算公式,然后发出该公式的输出。
combineLatest
结合传入的多个 Observables。 通过顺序的订阅每个输入Observable, 在每次任一输入Observables发送的时候收集
每个输入Observables最新的值组成一个数组, 然后要么将这个数组传给可选的投射函数并发送投射函数返回的结果,
或者在没有提供投射函数时仅仅发出该数组。
Params:
Name | Type | Attribute | Description |
other | ObservableInput | 将要和源 Observable 结合的输入 Observable。 可以传入多个输入 Observables。 |
|
project | function |
|
可选的投射函数,将输出 Observable 返回的值投射为要发出的新的值。 |
Example:
var weight = Rx.Observable.of(70, 72, 76, 79, 75);
var height = Rx.Observable.of(1.76, 1.77, 1.78);
var bmi = weight.combineLatest(height, (w, h) => w / (h * h));
bmi.subscribe(x => console.log('BMI is ' + x));
// With output to console:
// BMI is 24.212293388429753
// BMI is 23.93948099205209
// BMI is 23.671253629592222
public concat(other: ObservableInput, scheduler: Scheduler): Observable source
创建一个输出 Observable,它在当前 Observable 之后顺序地发出每个给定的输入 Observable 中的所有值。
通过顺序地发出多个 Observables 的值将它们连接起来,一个接一个的。
通过依次订阅输入Observable将输出Observable加入多个输入Observable,从源头开始, 合并它们的值给输出Observable. 只有前一个Observable结束才会进行下一个Observable。
Params:
Name | Type | Attribute | Description |
other | ObservableInput | 等待被连接的 Observable。 可以接受多个输入 Observable。 |
|
scheduler | Scheduler |
|
可选的调度器,控制每个输入 Observable 的订阅。 |
Example:
var timer = Rx.Observable.interval(1000).take(4);
var sequence = Rx.Observable.range(1, 10);
var result = timer.concat(sequence);
result.subscribe(x => console.log(x));
// results in:
// 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3 -immediate-> 1 ... 10
var timer1 = Rx.Observable.interval(1000).take(10);
var timer2 = Rx.Observable.interval(2000).take(6);
var timer3 = Rx.Observable.interval(500).take(10);
var result = timer1.concat(timer2, timer3);
result.subscribe(x => console.log(x));
// results in the following:
// (Prints to console sequentially)
// -1000ms-> 0 -1000ms-> 1 -1000ms-> ... 9
// -2000ms-> 0 -2000ms-> 1 -2000ms-> ... 5
// -500ms-> 0 -500ms-> 1 -500ms-> ... 9
public concatAll(): Observable source
通过顺序地连接内部 Observable,将高阶 Observable 转化为一阶 Observable 。
通过一个接一个的连接内部 Observable ,将高阶 Observable 打平。
串行连接源(高阶 Observable)所发出的每个 Observable,只有当一个内部 Observable 完成的时候才订阅下 一个内部 Observable,并将它们的所有值合并到返回的 Observable 中。
警告: 如果源 Observable 很快并且不停的发送 Observables, 内部 Observables 发送的完成 通知比源 Observable 慢, 你会遇到内存问题,因为传入的 Observables 在无界缓冲区中收集.
注意: concatAll 等价于 concurrency 参数(最大并发数)为1的 mergeAll 。
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var higherOrder = clicks.map(ev => Rx.Observable.interval(1000).take(4));
var firstOrder = higherOrder.concatAll();
firstOrder.subscribe(x => console.log(x));
// 结果如下:
// (结果是串行的)
// 对于"document"对象上的点击事件,都会以1秒的间隔发出从0到3的值
// one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3
public concatMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable source
将源值投射为一个合并到输出 Observable 的 Observable,以串行的方式等待前一个完成再合并下一个 Observable。
将每个值映射为 Observable, 然后使用concatAll将所有的 内部 Observables 打平。
返回一个 Observable,该 Observable 发出基于对源 Observable 发出的值调用提供的函数, 该函数返回所谓的内部 Observable。 每个新的内部 Observable 和前一个内部 Observable 连接在一起。
警告: 如果源值不断的到达并且速度快于内部 Observables 完成的速度, 它会导致内存问题, 因为内部的 Observable 在无限制的缓冲区中聚集,以等待轮流订阅。
Note: `concatMap` 等价于 `concurrency` 参数(最大并发数)为1的 `mergeMap` 。
Params:
Name | Type | Attribute | Description |
project | function(value: T, ?index: number): ObservableInput | 用在源Observable发出的每个值上,返回Observable. |
|
resultSelector | function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any |
|
函数,它用于产生基于值的输出 Observable 和源(外部)发送和内部 Observable 发送的索引。 传递给这个函数参数有:
|
Return:
Observable | Observable,发出对源Observable发出的每个值使用投射函数
(和可选的 |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.concatMap(ev => Rx.Observable.interval(1000).take(4));
result.subscribe(x => console.log(x));
// 结果如下:
// (结果是串行的)
// 对于"document"对象上的点击事件,都会以1秒的间隔发出从0到3的值
// one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3
public concatMapTo(innerObservable: ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable source
将每个源值投射成同一个 Observable ,该 Observable 会以串行的方式多次合并到输出 Observable 中 。
就像是concatMap, 但是将每个值总是映射为同一个内部 Observable。
不管源值是多少都将其映射为给定的innerObservable
, 然后将其打平为单个 Observable,也就
是所谓的输出 Observable。 在输出 Observable 上发出的每个新的 innerObservable 实例与
先前的 innerObservable 实例相连接。
警告: 如果源值不断的到达并且速度快于内部Observables完成的速度, 它会导致内存问题 因为内部的 Observable 在无限制的缓冲区中聚集,以等待轮流订阅。
Note: concatMapTo
is equivalent to mergeMapTo
with concurrency parameter
set to 1
.
Params:
Name | Type | Attribute | Description |
innerObservable | ObservableInput | Observable,替换源Observable的每个值. |
|
resultSelector | function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any |
|
函数,它用于产生基于值的输出 Observable 和源(外部)发送和内部 Observable 发送的索引。 传递给这个函数参数有:
|
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.concatMapTo(Rx.Observable.interval(1000).take(4));
result.subscribe(x => console.log(x));
// 结果如下:
// (结果不是并行的)
// 对于"document"对象上的点击事件,都会以1秒的间隔发出从0到3的值
// one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3
public count(predicate: function(value: T, i: number, source: Observable<T>): boolean): Observable source
计算源的发送数量,并当源完成时发出该数值。
当源完成的时候,告知总共发送了多少个值。
count
将发送数据的 Observable 转化为只发出源 Observable 总共发出的数据项的 Observable。
如果源 Observable 发生错误, count
将会发出错误而不是发出值。 如果源 Observable
一直不终结, count
既不会终结也不会发出数据。 这个操作符接受可选的predicate
函数做为参数,
在这种情况下,输出则表示源值中满足 predicate 函数的值的数量。
Params:
Name | Type | Attribute | Description |
predicate | function(value: T, i: number, source: Observable<T>): boolean |
|
A boolean 函数,用来选择哪些值会被计数。 参数如下:
|
Example:
var seconds = Rx.Observable.interval(1000);
var clicks = Rx.Observable.fromEvent(document, 'click');
var secondsBeforeClick = seconds.takeUntil(clicks);
var result = secondsBeforeClick.count();
result.subscribe(x => console.log(x));
var numbers = Rx.Observable.range(1, 7);
var result = numbers.count(i => i % 2 === 1);
result.subscribe(x => console.log(x));
// 结果是:
// 4
public debounce(durationSelector: function(value: T): SubscribableOrPromise): Observable source
只有在另一个 Observable 决定的一段特定时间经过后并且没有发出另一个源值之后,才从源 Observable 中发出一个值。
就像是 debounceTime, 但是静默时间段由第二个 Observable 决定。
debounce
延时发送源 Observable 发出的值,但如果源 Observable 发出了新值
的话,它会丢弃掉前一个等待中的延迟发送。这个操作符会追踪源 Observable 的最新值,
并通过调用 durationSelector 函数来生产 duration Observable。只有当
duration Observable 发出值或完成时,才会发出值,如果源 Observable 上没有发
出其他值,那么 duration Observable 就会产生。如果在 duration Observable 发
出前出现了新值,那么前一个值会被丢弃并且不会在输出 Observable 上发出。
就像debounceTime, 这是一个限制发出频率的操作符, 因为输出发送并不一定是 在同一时间发生的,就像它们在源 Observable 上所做的那样。
Params:
Name | Type | Attribute | Description |
durationSelector | function(value: T): SubscribableOrPromise | 该函数接受 源Observable的值, 用于计算每个值的延迟持续时间, 返回一个Observable或者Promise. |
Return:
Observable | Observable,通过 durationSelector 返回的特定 duration Observable 来延迟源 Observable 的发送,如果发送过于频繁可能会丢弃一些值。 |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.debounce(() => Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));
public debounceTime(dueTime: number, scheduler: Scheduler): Observable source
只有在特定的一段时间经过后并且没有发出另一个源值,才从源 Observable 中发出一个值。
就像是delay, 但是只通过每次大量发送中的最新值。
debounceTime
延时发送源 Observable 发送的值,但是会丢弃正在排队的发送如果源 Observable
又发出新值。 该操作符会追踪源 Observable 的最新值, 并且发出它当且仅当在 dueTime
时间段内
没有发送行为。 如果新的值在dueTime
静默时间段出现, 之前的值会被丢弃并且不会在输出 Observable
中发出。
这是一个控制发送频率的操作符,因为不可能在任何时间窗口的持续时间(dueTime)内发出一个以上的值,同样也是一个延时类操作符,因为输出 并不一定发生在同一时间,正如源 Observable 上发生的。 可选性的接收一个 IScheduler 用于管理定时器。
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.debounceTime(1000);
result.subscribe(x => console.log(x));
public defaultIfEmpty(defaultValue: any): Observable source
如果源 Observable 在完成之前没有发出任何 next 值,则发出给定的值,否则返回 Observable 的镜像。
如果源Observable本来就是空的,那么这个操作符会发出一个默认值。
如果源 Observable 是空的(在完成之前没有发出任何 next 值),那么 defaultIfEmpty 会发出源 Observable 或指定的默认值。
Params:
Name | Type | Attribute | Description |
defaultValue | any |
|
如果源Observable是空的话使用的默认值。 |
Return:
Observable | Observable, 当源 Observable 不发出值时,该 Observable 发出指定的 defaultValue ,否则发出源 Observable 所发出的值。 |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var clicksBeforeFive = clicks.takeUntil(Rx.Observable.interval(5000));
var result = clicksBeforeFive.defaultIfEmpty('no clicks');
result.subscribe(x => console.log(x));
public delay(delay: number | Date, scheduler: Scheduler): Observable source
通过给定的超时或者直到一个给定的时间来延迟源 Observable 的发送。
每个数据项的发出时间都往后推移固定的毫秒数.
如果延时参数是数字, 这个操作符会将源 Observable 的发出时间都往后推移固定的毫秒数。 保存值之间的相对时间间隔.
如果延迟参数是日期类型, 这个操作符会延时Observable的执行直到到了给定的时间.
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var delayedClicks = clicks.delay(1000); // each click emitted after 1 second
delayedClicks.subscribe(x => console.log(x));
var clicks = Rx.Observable.fromEvent(document, 'click');
var date = new Date('March 15, 2050 12:00:00'); // in the future
var delayedClicks = clicks.delay(date); // click emitted only after that date
delayedClicks.subscribe(x => console.log(x));
public delayWhen(delayDurationSelector: function(value: T): Observable, subscriptionDelay: Observable): Observable source
在给定的时间范围内,延迟源 Observable 所有数据项的发送,该时间段由另一个 Observable 的发送决定。
就像是delay, 但是延时的时间间隔由第二个Observable决定.
delayWhen
通过由另一个 Observable 决定的时间段来延迟源 Observable 的每个发出值。
当源发出一个数据,delayDurationSelector
函数将该源值当做参数, 返回一个被称为“持续”的 Observable。
当且仅当持续的 Observable 发出或完成时,源值才会在输出 Observable 上发出。
可选的, delayWhen
接受第二个参数, subscriptionDelay
,它是一个 Observable。
当 subscriptionDelay
发出第一个值或者完成,源 Observable 被订阅并且开始像前一段描
述的一样。 如果 subscriptionDelay
没有提供,delayWhen
将会订阅源 Observable 只
要输出 Observable 被订阅。
Params:
Name | Type | Attribute | Description |
delayDurationSelector | function(value: T): Observable | 该函数为源 Observable 发出的每个值返回一个 Observable,用于延迟在输出 Observable 上的发送,直到返回的 Observable 发出值。 |
|
subscriptionDelay | Observable | Observable,该 Observable 一旦发出任何值则触发源 Observable 的订阅。 |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var delayedClicks = clicks.delayWhen(event =>
Rx.Observable.interval(Math.random() * 5000)
);
delayedClicks.subscribe(x => console.log(x));
public dematerialize(): Observable source
将 Notification 对象的 Observable 转换成它们所代表的发送。
将 Notification 对象拆开成实际的 next
、
error
和 complete
发送。它与 materialize 是相反的。
dematerialize
被假定用来操作只发送值为 Notification 对象的 next
,
不发送 error
的 Observable 。这样的 Obseravble 其实是 materialize
操作符的输出。然后这些通知会使用它们所包含的元数据进行拆解,并在输出 Observable 上
发出 next
、error
和 complete
。
与 materialize 结合来使用此操作符。
Example:
var notifA = new Rx.Notification('N', 'A');
var notifB = new Rx.Notification('N', 'B');
var notifE = new Rx.Notification('E', void 0,
new TypeError('x.toUpperCase is not a function')
);
var materialized = Rx.Observable.of(notifA, notifB, notifE);
var upperCase = materialized.dematerialize();
upperCase.subscribe(x => console.log(x), e => console.error(e));
// 结果:
// A
// B
// TypeError: x.toUpperCase is not a function
public distinct(keySelector: function, flushes: Observable): Observable source
返回 Observable,它发出由源 Observable 所发出的所有与之前的项都不相同的项。
如果提供了 keySelector 函数,那么它会将源 Observable 的每个值都投射成一个新的值,这个值会用来检查是否与先前投射的值相等。如果没有提供 keySelector 函数,它会直接使用源 Observable 的每个值来检查是否与先前的值相等。
在支持 Set
的 JavaScript 运行时中,此操作符会使用 Set
来提升不同值检查的性能。
在其他运行时中,此操作符会使用 Set
的最小化实现,此实现在底层依赖于 Array
和 indexOf
,因为要检查更多的值来进行区分,所以性能会降低。
即使是在新浏览器中,长时间运行的 distinct
操作也可能会导致内存泄露。为了在某种场景下来缓解这个问题,可以提供一个可选的 flushes
参数,
这样内部的 Set
可以被“清空”,基本上清除了它的所有值。
Params:
Name | Type | Attribute | Description |
keySelector | function |
|
可选函数,用来选择某个键的值以检查是否是不同的。 |
flushes | Observable |
|
可选 Observable,用来清空操作符内部的 HashSet 。 |
Example:
Observable.of(1, 1, 2, 2, 2, 1, 2, 3, 4, 3, 2, 1)
.distinct()
.subscribe(x => console.log(x)); // 1, 2, 3, 4
interface Person {
age: number,
name: string
}
Observable.of<Person>(
{ age: 4, name: 'Foo'},
{ age: 7, name: 'Bar'},
{ age: 5, name: 'Foo'})
.distinct((p: Person) => p.name)
.subscribe(x => console.log(x));
// 显示:
// { age: 4, name: 'Foo' }
// { age: 7, name: 'Bar' }
public distinctUntilChanged(compare: function): Observable source
返回 Observable,它发出源 Observable 发出的所有与前一项不相同的项。
如果提供了 compare 函数,那么每一项都会调用它来检验是否应该发出这个值。
如果没有提供 compare 函数,默认使用相等检查。
Params:
Name | Type | Attribute | Description |
compare | function |
|
可选比较函数,用来检验当前项与源中的前一项是否相同。 |
Example:
Observable.of(1, 1, 2, 2, 2, 1, 1, 2, 3, 3, 4)
.distinctUntilChanged()
.subscribe(x => console.log(x)); // 1, 2, 1, 2, 3, 4
interface Person {
age: number,
name: string
}
Observable.of<Person>(
{ age: 4, name: 'Foo'},
{ age: 7, name: 'Bar'},
{ age: 5, name: 'Foo'})
{ age: 6, name: 'Foo'})
.distinctUntilChanged((p: Person, q: Person) => p.name === q.name)
.subscribe(x => console.log(x));
// 显示:
// { age: 4, name: 'Foo' }
// { age: 7, name: 'Bar' }
// { age: 5, name: 'Foo' }
public distinctUntilKeyChanged(key: string, compare: function): Observable source
返回 Observable,它发出源 Observable 发出的所有与前一项不相同的项,使用通过提供的 key 访问到的属性来检查两个项是否不同。
如果提供了 compare 函数,那么每一项都会调用它来检验是否应该发出这个值。
如果没有提供 compare 函数,默认使用相等检查。
Example:
interface Person {
age: number,
name: string
}
Observable.of<Person>(
{ age: 4, name: 'Foo'},
{ age: 7, name: 'Bar'},
{ age: 5, name: 'Foo'},
{ age: 6, name: 'Foo'})
.distinctUntilKeyChanged('name')
.subscribe(x => console.log(x));
// 显示:
// { age: 4, name: 'Foo' }
// { age: 7, name: 'Bar' }
// { age: 5, name: 'Foo' }
interface Person {
age: number,
name: string
}
Observable.of<Person>(
{ age: 4, name: 'Foo1'},
{ age: 7, name: 'Bar'},
{ age: 5, name: 'Foo2'},
{ age: 6, name: 'Foo3'})
.distinctUntilKeyChanged('name', (x: string, y: string) => x.substring(0, 3) === y.substring(0, 3))
.subscribe(x => console.log(x));
// 显示:
// { age: 4, name: 'Foo1' }
// { age: 7, name: 'Bar' }
// { age: 5, name: 'Foo2' }
public do(nextOrObserver: Observer | function, error: function, complete: function): Observable source
为源 Observable 上的每次发送执行副作用,但返回的 Observable 与源 Observable 是相同的。
拦截源 Observable 上的每次发送并且运行一个函数,但返回的输出 Observable 与 源 Observable 是相同的,只要不发生错误即可。
返回源 Observable 的镜像,但镜像是修改过的,以便调用提供的 Observer 来为源 Observable 发出的每个值,错误和完成执行副作用。在上述的 Observer 或处理方法中抛出的任何错误都可以 安全地发送到输出 Observable 的错误路径中。
此操作符适用于调试 Observables 以查看值是否正确,或者执行一些其他的副作用操作。
注意:此操作符不同于 Observable 的 subscribe
。如果 do
返回的 Observable 没有被订阅,
那么观察者指定的副作用永远不会执行。因此 do
只是侦查已存在的执行,它不会像 subscribe
那样触发执行的发生。
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var positions = clicks
.do(ev => console.log(ev))
.map(ev => ev.clientX);
positions.subscribe(x => console.log(x));
public elementAt(index: number, defaultValue: T): Observable source
只发出单个值,这个值位于源 Observable 的发送序列中的指定 index
处。
只发出第i个值, 然后完成。
elementAt
返回的 Observable 会发出源 Observable 指定 index
处的项,如果
index
超出范围并且提供了 default
参数的话,会发出一个默认值。如果没有提供
default
参数并且 index
超出范围,那么输出 Observable 会发出一个
ArgumentOutOfRangeError
错误。
Params:
Name | Type | Attribute | Description |
index | number | 是 Subscription 开始后的第i个通知的索引数值,该值是从 |
|
defaultValue | T |
|
缺失索引时返回的默认值。 |
Throw:
当使用 |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.elementAt(2);
result.subscribe(x => console.log(x));
// 结果:
// click 1 = nothing
// click 2 = nothing
// click 3 = 打印到控制台的 MouseEvent 对象
public every(predicate: function, thisArg: any): Observable source
返回的 Observable 发出是否源 Observable 的每项都满足指定的条件。
Params:
Name | Type | Attribute | Description |
predicate | function | 用来确定每一项是否满足指定条件的函数。 |
|
thisArg | any |
|
可选对象,作为回调函数中的 |
Example:
Observable.of(1, 2, 3, 4, 5, 6)
.every(x => x < 5)
.subscribe(x => console.log(x)); // -> false
public exhaust(): Observable source
当前一个内部 Observable 还未完成的情况下,通过丢弃内部 Observable 使得 高阶 Observable 转换成一阶 Observable。
在当前内部 Observable 仍在执行的情况下,通过丢弃 接下来的内部 Observable 将高阶 Observable 打平。
exhaust
订阅发出 Observables 的 Observable,也就是高阶 Observable 。
每次观察到这些已发出的内部 Observables 中的其中一个时,输出 Observable 开始发出该内部 Observable
要发出的项。到目前为止,它的行为就像 mergeAll 。然而,如果前一个 Observable
还未完成的话,exhaust
会忽略每个新的内部 Observable 。一旦完成,它将接受并打平下一个
内部 Observable ,然后重复此过程。
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var higherOrder = clicks.map((ev) => Rx.Observable.interval(1000).take(5));
var result = higherOrder.exhaust();
result.subscribe(x => console.log(x));
public exhaustMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable source
将每个源值投射成 Observable,只有当前一个投射的 Observable 已经完成, 这个 Observable 才会被合并到输出 Observable 中。
把每个值映射成 Observable,然后使用 exhaust 操作符打平所有的内部 Observables 。
返回的 Observable 基于应用一个函数来发送项,该函数提供给源 Observable 发出的每个项,
并返回一个(所谓的“内部”) Observable 。当它将源值投射成 Observable 时,输出 Observable
开始发出由投射的 Observable 发出的项。然而,如果前一个投射的 Observable 还未完成的话,
那么 exhaustMap
会忽略每个新投射的 Observable 。一旦完成,它将接受并打平下一个
内部 Observable ,然后重复此过程。
Params:
Name | Type | Attribute | Description |
project | function(value: T, ?index: number): ObservableInput | 函数, 当应用于源 Observable 发出的项时,返回一个 Observable 。 |
|
resultSelector | function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any |
|
函数,它用于产生基于值的输出 Observable 和源(外部)发送和内部 Observable 发送的索引。 传递给这个函数参数有:
|
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.exhaustMap((ev) => Rx.Observable.interval(1000).take(5));
result.subscribe(x => console.log(x));
public expand(project: function(value: T, index: number), concurrent: number): Observable source
递归地将每个源值投射成 Observable,这个 Observable 会被合并到输出 Observable 中。
它与 mergeMap 类似,但将投射函数应用于每个源值 以及每个输出值。它是递归的。
返回的 Observable 基于应用一个函数来发送项,该函数提供给源 Observable 发出的每个项,
并返回一个 Observable,然后合并这些作为结果的 Observable,并发出本次合并的结果。
expand
会重新发出在输出 Observable 上的每个源值。然后,将每个输出值传给投射函数,
该函数返回要合并到输出 Observable 上的内部 Observable 。由投影产生的那些输出值也会
被传给投射函数以产生新的输出值。这就是 expand
如何进行递归的。
Return:
Observable | Observable 发出源值,同时也将投影函数应用于在输出 Observable 上 发出的每个值以得到结果,然后合并这些从转换后得到的 Observables 的结果。 |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var powersOfTwo = clicks
.mapTo(1)
.expand(x => Rx.Observable.of(2 * x).delay(1000))
.take(10);
powersOfTwo.subscribe(x => console.log(x));
public filter(predicate: function(value: T, index: number): boolean, thisArg: any): Observable source
通过只发送源 Observable 的中满足指定 predicate 函数的项来进行过滤。
类似于 Array.prototype.filter(), 它只会发出源 Observable 中符合标准函数的值。
类似于大家所熟知的 Array.prototype.filter
方法,此操作符从源 Observable 中
接收值,将值传递给 predicate
函数并且只发出返回 true
的这些值。
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var clicksOnDivs = clicks.filter(ev => ev.target.tagName === 'DIV');
clicksOnDivs.subscribe(x => console.log(x));
public find(predicate: function(value: T, index: number, source: Observable<T>): boolean, thisArg: any): Observable<T> source
只发出源 Observable 所发出的值中第一个满足条件的值。
找到第一个通过测试的值并将其发出。
find
会查找源 Observable 中与 predicate
函数体现的指定条件匹配的第一项,然后
将其返回。不同于 first,在 find
中 predicate
是必须的,而且如果没找到
有效的值的话也不会发出错误。
Params:
Name | Type | Attribute | Description |
predicate | function(value: T, index: number, source: Observable<T>): boolean | 使用每项来调用的函数,用于测试是否符合条件。 |
|
thisArg | any |
|
可选参数,用来决定 |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.find(ev => ev.target.tagName === 'DIV');
result.subscribe(x => console.log(x));
public findIndex(predicate: function(value: T, index: number, source: Observable<T>): boolean, thisArg: any): Observable<T> source
只发出源 Observable 所发出的值中第一个满足条件的值的索引。
它很像 find , 但发出的是找到的值的索引, 而不是值本身。
findIndex
会查找源 Observable 中与 predicate
函数体现的指定条件匹配的第一项,然后
返回其索引(从0开始)。不同于 first,在 findIndex
中 predicate
是必须的,而且如果没找到
有效的值的话也不会发出错误。
Params:
Name | Type | Attribute | Description |
predicate | function(value: T, index: number, source: Observable<T>): boolean | 使用每项来调用的函数,用于测试是否符合条件。 |
|
thisArg | any |
|
可选参数,用来决定 |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.findIndex(ev => ev.target.tagName === 'DIV');
result.subscribe(x => console.log(x));
public first(predicate: function(value: T, index: number, source: Observable<T>): boolean, resultSelector: function(value: T, index: number): R, defaultValue: R): Observable<T | R> source
只发出由源 Observable 所发出的值中第一个(或第一个满足条件的值)。
只发出第一个值。或者只发出第一个通过测试的值。
如果不使用参数调用,first
会发出源 Observable 中的第一个值,然后完成。如果使用
predicate
函数来调用,first
会发出源 Observable 第一个满足条件的值。它还可以
接收 resultSelector
函数根据输入值生成输出值,假如在源 Observable 完成前无法发
出一个有效值的话,那么会发出 defaultValue
。如果没有提供 defaultValue
并且也
找不到匹配的元素,则抛出错误。
Params:
Name | Type | Attribute | Description |
predicate | function(value: T, index: number, source: Observable<T>): boolean |
|
使用每项来调用的可选函数,用于测试是否符合条件。 |
resultSelector | function(value: T, index: number): R |
|
函数,它基于源 Observable 的值和索引来生成输出 Observable 的值。传给这个函数的参数有:
|
defaultValue | R |
|
假如在源 Observable 上没有找到有效值,就会发出这个 默认值。 |
Return:
Observable<T | R> | 符合条件的第一项的 Observable 。 |
Throw:
如果在 Observable 完成之前还没有发出任何 |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.first();
result.subscribe(x => console.log(x));
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.first(ev => ev.target.tagName === 'DIV');
result.subscribe(x => console.log(x));
public forEach(next: Function, PromiseCtor: PromiseConstructor): Promise source
Params:
Name | Type | Attribute | Description |
next | Function | observable 发出的每个值的处理器。 |
|
PromiseCtor | PromiseConstructor |
|
用来生成 Promise 的构造函数。 |
public groupBy(keySelector: function(value: T): K, elementSelector: function(value: T): R, durationSelector: function(grouped: GroupedObservable<K, R>): Observable<any>): Observable<GroupedObservable<K, R>> source
根据指定条件将源 Observable 发出的值进行分组,并将这些分组作为 GroupedObservables
发出,每一个分组都是一个 GroupedObservable 。
Params:
Name | Type | Attribute | Description |
keySelector | function(value: T): K | 提取每项的键的函数。 |
|
elementSelector | function(value: T): R |
|
提取每项返回元素的函数。 |
durationSelector | function(grouped: GroupedObservable<K, R>): Observable<any> |
|
返回一个 Observable 来确定每个组应该存在多长时间的函数。 |
Return:
Observable<GroupedObservable<K, R>> | 发出 GroupedObservables 的 Observable, 每个 GroupedObservable 对应唯一的键值,并且会发出源 Observable 中共享该键值的项。 |
Example:
Observable.of<Obj>({id: 1, name: 'aze1'},
{id: 2, name: 'sf2'},
{id: 2, name: 'dg2'},
{id: 1, name: 'erg1'},
{id: 1, name: 'df1'},
{id: 2, name: 'sfqfb2'},
{id: 3, name: 'qfs3'},
{id: 2, name: 'qsgqsfg2'}
)
.groupBy(p => p.id)
.flatMap( (group$) => group$.reduce((acc, cur) => [...acc, cur], []))
.subscribe(p => console.log(p));
// 显示:
// [ { id: 1, name: 'aze1' },
// { id: 1, name: 'erg1' },
// { id: 1, name: 'df1' } ]
//
// [ { id: 2, name: 'sf2' },
// { id: 2, name: 'dg2' },
// { id: 2, name: 'sfqfb2' },
// { id: 2, name: 'qsgqsfg2' } ]
//
// [ { id: 3, name: 'qfs3' } ]
Observable.of<Obj>({id: 1, name: 'aze1'},
{id: 2, name: 'sf2'},
{id: 2, name: 'dg2'},
{id: 1, name: 'erg1'},
{id: 1, name: 'df1'},
{id: 2, name: 'sfqfb2'},
{id: 3, name: 'qfs1'},
{id: 2, name: 'qsgqsfg2'}
)
.groupBy(p => p.id, p => p.name)
.flatMap( (group$) => group$.reduce((acc, cur) => [...acc, cur], ["" + group$.key]))
.map(arr => ({'id': parseInt(arr[0]), 'values': arr.slice(1)}))
.subscribe(p => console.log(p));
// 显示:
// { id: 1, values: [ 'aze1', 'erg1', 'df1' ] }
// { id: 2, values: [ 'sf2', 'dg2', 'sfqfb2', 'qsgqsfg2' ] }
// { id: 3, values: [ 'qfs1' ] }
public last(predicate: function): Observable source
返回的 Observable 只发出由源 Observable 发出的最后一个值。它可以接收一个可选的 predicate 函数作为 参数,如果传入 predicate 的话则发送的不是源 Observable 的最后一项,而是发出源 Observable 中 满足 predicate 函数的最后一项。
Params:
Name | Type | Attribute | Description |
predicate | function | 任何由源 Observable 发出的项都必须满足的条件函数。 |
Return:
Observable | 该 Observable 只发出源 Observable 中满足给定条件的最后一项, 或者没有任何项满足条件时发出 NoSuchElementException 。 |
Throw:
如果 Observale 完成前还没有发出任何 |
|
* |
如果在源 Observable 中没有匹配 predicate 函数的项,则抛出。 |
public letProto(func: *): Observable<R> source
Params:
Name | Type | Attribute | Description |
func | * |
public lift(operator: Operator): Observable source
创建一个新的 Observable,以它作为源,并传递操作符的定义作为新的 observable 操作符。
Params:
Name | Type | Attribute | Description |
operator | Operator | 定义了如何操作 observable 的操作符。 |
public map(project: function(value: T, index: number): R, thisArg: any): Observable<R> source
将给定的 project
函数应用于源 Observable 发出的每个值,并将结果值作为
Observable 发出。
类似于 Array.prototype.map(), 它把每个源值传递给转化函数以获得相应的输出值。
类似于大家所熟知的 Array.prototype.map
方法,此操作符将投射函数应用于每个值
并且在输出 Observable 中发出投射后的结果。
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var positions = clicks.map(ev => ev.clientX);
positions.subscribe(x => console.log(x));
Test:
public mapTo(value: any): Observable source
每次源 Observble 发出值时,都在输出 Observable 上发出给定的常量值。
类似于 map,但它每一次都把源值映射成同一个输出值。
接收常量 value
作为参数,并每当源 Observable 发出值时都发出这个值。换句话说,
就是忽略实际的源值,然后简单地使用这个发送时间点以知道何时发出给定的 value
。
Params:
Name | Type | Attribute | Description |
value | any | 将每个源值映射成的值。 |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var greetings = clicks.mapTo('Hi');
greetings.subscribe(x => console.log(x));
See:
public materialize(): Observable<Notification<T>> source
表示源 Observable 中的所有通知,每个通知都会在 Notification 对象中标记为
它们原始的通知类型,并会作为输出 Observable 的 next
通知。
在 Notification 对象中包装 next
、error
和 complete
发送, 并在输出 Observable 上作为 next
发送出去。
materialize
返回一个 Observable,这个 Observable 会为每个源 Observable 的
next
、error
或 complete
通知发出 next
通知。当源 Observable 发出 complete
时,
输出 Observable 会发出 next
并且 Notification 类型为 "complete",然后它也发出
complete
。当源 Observable 发出 error
时,输出 Observable 会发出 next
并且
Notification 类型为 "error",然后发出 complete
。
该操作符对于生成源 Observable 的元数据很有用,并作为 next
发送使用掉。
与 dematerialize 结合使用。
Return:
Observable<Notification<T>> | 该 Observable 会发出 Notification 对象, 该对象包装了来自源 Observable 的带有元数据的原始通知。 |
Example:
var letters = Rx.Observable.of('a', 'b', 13, 'd');
var upperCase = letters.map(x => x.toUpperCase());
var materialized = upperCase.materialize();
materialized.subscribe(x => console.log(x));
// 结果如下:
// - Notification {kind: "N", value: "A", error: undefined, hasValue: true}
// - Notification {kind: "N", value: "B", error: undefined, hasValue: true}
// - Notification {kind: "E", value: undefined, error: TypeError:
// x.toUpperCase is not a function at MapSubscriber.letters.map.x
// [as project] (http://1…, hasValue: false}
public max(comparer: Function): Observable source
max
操作符操作的 Observable 发出数字(或可以与提供的函数进行比较的项)并且当源 Observable 完成时它发出单一项:最大值的项。
Params:
Name | Type | Attribute | Description |
comparer | Function |
|
可选的比较函数,用它来替代默认值来比较两项的值。 |
Example:
Rx.Observable.of(5, 4, 7, 2, 8)
.max()
.subscribe(x => console.log(x)); // -> 8
interface Person {
age: number,
name: string
}
Observable.of<Person>({age: 7, name: 'Foo'},
{age: 5, name: 'Bar'},
{age: 9, name: 'Beer'})
.max<Person>((a: Person, b: Person) => a.age < b.age ? -1 : 1)
.subscribe((x: Person) => console.log(x.name)); // -> 'Beer'
}
Test:
See:
public merge(other: ObservableInput, concurrent: number, scheduler: Scheduler): Observable source
创建一个输出 Observable ,它可以同时发出每个给定的输入 Observable 中的所有值。
通过把多个 Observables 的值混合到一个 Observable 中 来将其打平。
merge
订阅每个给定的输入 Observable (给定的源或作为参数的 Observable ),然后只是
将所有输入 Observables 的所有值发送(不进行任何转换)到输出 Observable 。所有的输入
Observable 都完成了,输出 Observable 才能完成。任何由输入 Observable 发出的错误都
会立即在输出 Observalbe 上发出。
Params:
Name | Type | Attribute | Description |
other | ObservableInput | 可以与源 Observable 合并的输入 Observable 。 可以给定多个输入 Observables 作为参数。 |
|
concurrent | number |
|
可以同时订阅的输入 Observables 的最大数量。 |
scheduler | Scheduler |
|
用来管理输入 Observables 的并发性的 调度器。 |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var timer = Rx.Observable.interval(1000);
var clicksOrTimer = clicks.merge(timer);
clicksOrTimer.subscribe(x => console.log(x));
var timer1 = Rx.Observable.interval(1000).take(10);
var timer2 = Rx.Observable.interval(2000).take(6);
var timer3 = Rx.Observable.interval(500).take(10);
var concurrent = 2; // 参数
var merged = timer1.merge(timer2, timer3, concurrent);
merged.subscribe(x => console.log(x));
public mergeAll(concurrent: number): Observable source
将高阶 Observable 转换成一阶 Observable ,一阶 Observable 会同时发出在内部 Observables 上发出的所有值。
打平高阶 Observable 。
mergeAll
订阅发出 Observables 的 Observalbe ,也称为高阶 Observable 。
每当观察到发出的内部 Observable 时,它会订阅并发出输出 Observable 上的这个
内部 Observable 的所有值。所有的内部 Observable 都完成了,输出 Observable
才能完成。任何由内部 Observable 发出的错误都会立即在输出 Observalbe 上发出。
Params:
Name | Type | Attribute | Description |
concurrent | number |
|
可以同时订阅的输入 Observables 的最大数量。 |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var higherOrder = clicks.map((ev) => Rx.Observable.interval(1000));
var firstOrder = higherOrder.mergeAll();
firstOrder.subscribe(x => console.log(x));
var clicks = Rx.Observable.fromEvent(document, 'click');
var higherOrder = clicks.map((ev) => Rx.Observable.interval(1000).take(10));
var firstOrder = higherOrder.mergeAll(2);
firstOrder.subscribe(x => console.log(x));
public mergeMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable source
将每个源值投射成 Observable ,该 Observable 会合并到输出 Observable 中。
将每个值映射成 Observable ,然后使用 mergeAll 打平所有的内部 Observables 。
返回的 Observable 基于应用一个函数来发送项,该函数提供给源 Observable 发出的每个项, 并返回一个 Observable,然后合并这些作为结果的 Observable,并发出本次合并的结果。
Params:
Name | Type | Attribute | Description |
project | function(value: T, ?index: number): ObservableInput | 函数,
|
|
resultSelector | function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any |
|
|
concurrent | number |
|
可以同时订阅的输入 Observables 的最大数量。 |
Return:
Observable | 该 Observable 发出由源 Observable 发出的每项应用投射函数
(和可选的 |
Example:
var letters = Rx.Observable.of('a', 'b', 'c');
var result = letters.mergeMap(x =>
Rx.Observable.interval(1000).map(i => x+i)
);
result.subscribe(x => console.log(x));
// 结果如下:
// a0
// b0
// c0
// a1
// b1
// c1
// 继续列出a、b、c加上各自的自增数列
public mergeMapTo(innerObservable: ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable source
将每个源值投射成同一个 Observable ,该 Observable 会多次合并到输出 Observable 中。
它很像 mergeMap,但永远将每个值映射到同一个内部 Observable 。
将每个源值映射成给定的 Observable :innerObservable
,而无论源值是什么,然后
将这些结果 Observables 合并到单个的 Observable ,也就是输出 Observable 。
Params:
Name | Type | Attribute | Description |
innerObservable | ObservableInput | 用来替换源 Observable 中的每个值 的 Observable 。 |
|
resultSelector | function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any |
|
|
concurrent | number |
|
可以同时订阅的输入 Observables 的最大数量。 |
Return:
Observable | 每次源 Observable 发出值时,该 Observable 发出来自
给定 |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.mergeMapTo(Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));
public mergeScan(accumulator: function(acc: R, value: T): Observable<R>, seed: *, concurrent: number): Observable<R> source
在源 Observable 上应用 accumulator 函数,其中 accumulator 函数本身返回 Observable ,然后每个返回的中间 Observable 会被合并到输出 Observable 中。
它很像 scan,但 accumulator 函数返回的 Observables 会被合并到外部 Observalbe 中。
Params:
Name | Type | Attribute | Description |
accumulator | function(acc: R, value: T): Observable<R> | 在每个源值上调用的累加器函数。 |
|
seed | * | 初始的累加值。 |
|
concurrent | number |
|
可以同时订阅的输入 Observables 的最大数量。 |
Example:
const click$ = Rx.Observable.fromEvent(document, 'click');
const one$ = click$.mapTo(1);
const seed = 0;
const count$ = one$.mergeScan((acc, one) => Rx.Observable.of(acc + one), seed);
count$.subscribe(x => console.log(x));
// 结果:
1
2
3
4
// ...以此类推,每次点击计数增加1
public min(comparer: Function): Observable<R> source
min
操作符操作的 Observable 发出数字(或可以使用提供函数进行比较的项)并且当源 Observable 完成时它发出单一项:最小值的项。
Params:
Name | Type | Attribute | Description |
comparer | Function |
|
可选的比较函数,用它来替代默认值来比较两项的值。 |
Example:
Rx.Observable.of(5, 4, 7, 2, 8)
.min()
.subscribe(x => console.log(x)); // -> 2
interface Person {
age: number,
name: string
}
Observable.of<Person>({age: 7, name: 'Foo'},
{age: 5, name: 'Bar'},
{age: 9, name: 'Beer'})
.min<Person>( (a: Person, b: Person) => a.age < b.age ? -1 : 1)
.subscribe((x: Person) => console.log(x.name)); // -> 'Bar'
}
Test:
See:
public multicast(subjectOrSubjectFactory: Function | Subject, selector: Function): Observable source
返回的 Observable 发出对 ConnectableObservable 发出的项调用一个指定的 selector 函数的结果, ConnectableObservable 可以在潜在的多个流之间共享单个 subscription 。
Return:
Observable | 该 Observable 发出对 ConnectableObservable 发出的项调用 selector 函数的结果, ConnectableObservable 可以在潜在的多个流之间共享单个 subscription 。 |
public observeOn(scheduler: IScheduler, delay: number): Observable<T> source
使用指定的调度器来重新发出源 Observable 的所有通知。
确保从 Observable 的外部使用特定的调度器。
observeOn
操作符接收一个 scheduler 作为第一个参数,它将用于重新安排源 Observable 所发送的通知。如果你不能控制
给定 Observable 的内部调度器,但是想要控制何时发出值,那么这个操作符可能是有用的。
返回的 Observable 发出与源 Observable 相同的通知(next
、complete
和 error
),但是使用提供的调度器进行了重新安排。
注意,这并不意味着源 Observables 的内部调度器会以任何形式被替换。原始的调度器仍然会被使用,但是当源 Observable 发出
通知时,它会立即重新安排(这时候使用传给 observeOn
的调度器)。在同步地发出大量的值的 Observalbe 上调用 observeOn
是一种反模式,这会将 Observable 的发送分解成异步块。为了实现这一点,调度器必须直接传递给源 Observable (通常是创建它的操作符)。
observeOn
只是简单地像通知延迟一些,以确保这些通知在预期的时间点发出。
事实上,observeOn
接收第二个参数,它以毫秒为单位指定延迟通知的发送时间。observeOn
与 delay
操作符最主要的区别是它会延迟所有通知,包括错误通知,而 delay
会当源 Observable 发出错误时立即通过错误。
通常来说,对于想延迟流中的任何值,强烈推荐使用 delay
操作符,而使用 observeOn
时,用来指定应该使用
哪个调度器来进行通知发送。
Params:
Name | Type | Attribute | Description |
scheduler | IScheduler | 用于重新安排源 Observable 的通知的调度器。 |
|
delay | number |
|
应该重新安排的每个通知的延迟时间的毫秒数。 |
Example:
const intervals = Rx.Observable.interval(10); // 默认情况下,interval 使用异步调度器进行调度
intervals
.observeOn(Rx.Scheduler.animationFrame) // 但我们将在 animationFrame 调度器上进行观察,
.subscribe(val => { // 以确保动画的流畅性。
someDiv.style.height = val + 'px';
});
See:
public onErrorResumeNext(observables: ...ObservableInput): Observable source
当任何提供的 Observable 发出完成或错误通知时,它会立即地订阅已传入下一个 Observable 。
无论发生什么,都会执行一系列的 Observables ,即使这意味着要吞咽错误。
onErrorResumeNext
操作符接收一系列的 Observables ,可与直接作为参数或数组提供。如果没提供 Observable ,
返回的 Observable 与源 Observable 的行为是相同的。
onErrorResumeNext
返回的 Observable 通过订阅和重新发出源 Observable 的值开始。当流的值完成时,无论 Observable
是完成还是发出错误,onErrorResumeNext
都会订阅作为参数传给该方法的第一个 Observable 。它也会开始重新发出它的值,
再一次,当流完成时,onErrorResumeNext
又会继续订阅已提供的系列 Observable 中另一个,无论前一个 Observable
是否完成或发生错误。这样的行为会持续到系列中没有更多的 Observable ,返回的 Observale 将在此时完成,即使最后订阅的
流是以错误结束的。
因此, onErrorResumeNext
可以认为是某个版本的 concat 操作符,只是当它的输入 Observables 发生
错误时,它更为宽容。然而,concat
只有当前一个 Observable 成功完成了,它才会订阅系列中的下个 Observable ,
而 onErrorResumeNext
即使是以错误完成某个 Observalbe 时,它也会订阅下一个。
注意,对于由 Observables 发出的错误,你无法获得访问权限。特别是不要指望可以将这些出现在错误回调函数中的 错误传递给 subscribe 。如果要根据 Observable 发出的错误采取特定的操作,应该尝试使用 catch 。
Params:
Name | Type | Attribute | Description |
observables | ...ObservableInput | 传入的 Observables,可以直接传入,也可以作为数组传入。 |
Return:
Observable | 该 Observable 发出源 Observable 的值,但如果发出错误,它会订阅下一个传入的 Observable ,并以此类推,直到它完成或用完所有的 Observable 。 |
Example:
Rx.Observable.of(1, 2, 3, 0)
.map(x => {
if (x === 0) { throw Error(); }
return 10 / x;
})
.onErrorResumeNext(Rx.Observable.of(1, 2, 3))
.subscribe(
val => console.log(val),
err => console.log(err), // 永远不会调用
() => console.log('that\'s it!')
);
// 输出:
// 10
// 5
// 3.3333333333333335
// 1
// 2
// 3
// "that's it!"
public pairwise(): Observable<Array<T>> source
将一系列连续的发送成对的组合在一起,并将这些分组作为两个值的数组发出。
将当前值和前一个值作为数组放在一起,然后将其发出。
源 Observable 的第N个发送会使输出 Observable 发出一个数组 [(N-1)th, Nth],即前一个
值和当前值的数组,它们作为一对。出于这个原因,pairwise
发出源 Observable 的
第二个和随后的发送,而不发送第一个,因为它没有前一个值。
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var pairs = clicks.pairwise();
var distance = pairs.map(pair => {
var x0 = pair[0].clientX;
var y0 = pair[0].clientY;
var x1 = pair[1].clientX;
var y1 = pair[1].clientY;
return Math.sqrt(Math.pow(x0 - x1, 2) + Math.pow(y0 - y1, 2));
});
distance.subscribe(x => console.log(x));
See:
public partition(predicate: function(value: T, index: number): boolean, thisArg: any): [Observable<T>, Observable<T>] source
将源 Observable 一分为二,一个是所有满足 predicate 函数的值,另一个是所有 不满足 predicate 的值。
它很像 filter,但是返回两个 Observables : 一个像 filter 的输出, 而另一个是所有不符合条件的值。
partition
输出有两个 Observables 的数组,这两个 Observables 是通过给定的 predicate
函数将源 Observable 的值进行划分得到的。该数组的第一个 Observable 发出 predicate 参数
返回 true 的源值。第二个 Observable 发出 predicate 参数返回 false 的源值。第一个像是
filter ,而第二个像是 predicate 取反的 filter 。
Return:
[Observable<T>, Observable<T>] | 有两个 Observables 的数组: 一个是通过 predicate 函数的所有值,另一个是没有通过 predicate 的所有值。 |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var parts = clicks.partition(ev => ev.target.tagName === 'DIV');
var clicksOnDivs = parts[0];
var clicksElsewhere = parts[1];
clicksOnDivs.subscribe(x => console.log('DIV clicked: ', x));
clicksElsewhere.subscribe(x => console.log('Other clicked: ', x));
See:
public pluck(properties: ...string): Observable source
将每个源值(对象)映射成它指定的嵌套属性。
类似于 map,但仅用于选择每个发出对象的某个嵌套属性。
给定描述对象属性路径的字符串的列表,然后从源 Observable 中的所有值中检索指定嵌套
属性的值。如果属性无法解析,它会返回 undefined
。
Params:
Name | Type | Attribute | Description |
properties | ...string | 从每个源值(对象啊)中提取的嵌套属性。 |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var tagNames = clicks.pluck('target', 'tagName');
tagNames.subscribe(x => console.log(x));
See:
public publish(selector: Function): * source
返回 ConnectableObservable,它是 Observable 的变种,它会一直等待,直到 connnect 方法被调用才会开始把值发送给那些订阅它的观察者。
Params:
Name | Type | Attribute | Description |
selector | Function |
|
可选的选择器函数,可以根据需要多次使用以多播源序列,而不会导致源序列 生成多个 subscriptions 。给定源的订阅者会从订阅开始的一刻起,接收源的所有通知。 |
Return:
* | ConnectableObservable,一旦连接,源 Observable 便会向它的观察者发出项。 |
public publishBehavior(value: *): ConnectableObservable<T> source
Params:
Name | Type | Attribute | Description |
value | * |
public publishLast(): ConnectableObservable<T> source
public publishReplay(bufferSize: *, windowTime: *, scheduler: *): ConnectableObservable<T> source
Params:
Name | Type | Attribute | Description |
bufferSize | * | ||
windowTime | * | ||
scheduler | * |
public race(): Observable source
返回 Observable,该 Observable 是源 Observable 和提供的 Observables 的组合中 第一个发出项的 Observable 的镜像。
Params:
Name | Type | Attribute | Description |
...observables | ...Observables | 用于竞争的 Observables 源,以比试哪个 Observable 会首先发出项。 |
public reduce(accumulator: function(acc: R, value: T, index: number): R, seed: R): Observable<R> source
在源 Observalbe 上应用 accumulator (累加器) 函数,然后当源 Observable 完成时,返回 累加的结果,可以提供一个可选的 seed 值。
使用 accumulator (累加器) 函数将源 Observable 所发出的所有值归并在一起, 该函数知道如何将新的源值纳入到过往的累加结果中。
类似于 Array.prototype.reduce(),
reduce
可以对累加值和源 Observable (过去的)的每个值应用 accumulator
函数,
然后将其归并成一个值并且在输出 Observable 上发出。注意,reduce
只会发出一个值,
并且是当源 Observable 完成时才发出。它等价于使用 scan 操作符后面再跟
last 操作符。
返回的 Observable 为由源 Observable 发出的每项应用指定的 accumulator
函数。
如果指定了 seed
值,那么这个值会作为 accumulator
函数的初始值。如果没有指定
seed
值,那么源中的第一项会作为 seed
来使用。
Example:
var clicksInFiveSeconds = Rx.Observable.fromEvent(document, 'click')
.takeUntil(Rx.Observable.interval(5000));
var ones = clicksInFiveSeconds.mapTo(1);
var seed = 0;
var count = ones.reduce((acc, one) => acc + one, seed);
count.subscribe(x => console.log(x));
public repeat(count: number): Observable source
返回的 Observable 重复由源 Observable 所发出的项的流,最多可以重复 count 次。
Params:
Name | Type | Attribute | Description |
count | number |
|
源 Observable 项重复的次数,如果 count 为0则产生一个空的 Observable 。 |
public repeatWhen(notifier: function(notifications: Observable): Observable): Observable source
返回的 Observalbe 是源 Observable 的镜像,除了 complete
。如果源 Observable 调用了 complete
,这个方法会发出给 notifier
返回的 Observable 。如果这个 Observale 调用了 complete
或 error
,那么这个方法会在子 subscription 上调用
complete
或 error
。否则,此方法将重新订阅源 Observable。
Params:
Name | Type | Attribute | Description |
notifier | function(notifications: Observable): Observable | 接收 Observable 的通知,用户可以该通知
的 |
public retry(count: number): Observable source
返回一个 Observable, 该 Observable 是源 Observable 不包含错误异常的镜像。 如果源 Observable 发生错误, 这个方法不会传播错误而是会不 断的重新订阅源 Observable 直到达到最大重试次数 (由数字参数指定)。
任何所有被源 Observable 发出的数据项都会被做为结果的 Observable 发出, 即使这些发送是在失败的订阅期间。 举个例子, 如果一个 Observable
第一次发送[1, 2]后失败了,紧接着第二次成功的发出: [1, 2, 3, 4, 5]后触发完成, 最后发送流和通知为: [1, 2, 1, 2, 3, 4, 5, complete
]。
Params:
Name | Type | Attribute | Description |
count | number | 在失败之前重试的次数。 |
public retryWhen(notifier: function(errors: Observable): Observable): Observable source
返回一个 Observable, 该 Observable 是源 Observable 不包含错误异常的镜像。 如果源头 Observable 触发
error
, 这个方法会发出引起错误的 Throwable 给 notifier
返回的 Observable。 如果该 Observable 触发 complete
或者 error
则该方法会使子订阅触发 complete
和 error
。 否则该方法会重新订阅源 Observable。
Params:
Name | Type | Attribute | Description |
notifier | function(errors: Observable): Observable | 接受一个用户可以 |
public sample(notifier: Observable<any>): Observable<T> source
发出源 Observable 最新发出的值当另一个 notifier
Observable发送时。
就像是 sampleTime, 但是无论何时notifier
Observable
进行了发送都会去取样。
无论何时 notifier
Observable 发出一个值或者完成, sample
会去源 Observable 中发送上次
取样后源 Observable 发出的最新值, 除非源在上一次取样后没有发出值。 notifier
会被订阅只要输出
Observable 被订阅。
Params:
Name | Type | Attribute | Description |
notifier | Observable<any> | 被用来取样的源 Observable。 |
Example:
var seconds = Rx.Observable.interval(1000);
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = seconds.sample(clicks);
result.subscribe(x => console.log(x));
public sampleTime(period: number, scheduler: Scheduler): Observable<T> source
在周期时间间隔内发出源 Observable 发出的最新值。
在周期时间间隔内取样源 Observable , 发出取样的。
sampleTime
周期性的查看源 Observable 并且发出上次取样后发出的最新的值, 除非上次取样后
就没有再发出数据了。 取样在每个周期毫秒(或者时间单位由可选的调度器参数决定)内定期发生。 只要
输出 Observable 被订阅取样就开始。
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.sampleTime(1000);
result.subscribe(x => console.log(x));
public scan(accumulator: function(acc: R, value: T, index: number): R, seed: T | R): Observable<R> source
对源 Observable 使用累加器函数, 返回生成的中间值, 可选的初始值。
就想是 reduce, 但是发出目前的累计数当源发出数据的时候。
将所有源发出的数据结合起来, 使用一个累加器函数,该函数知道如何将新的值加入到累加器中。 这就像是reduce, 但是会发出中间的累加值。
返回一个 Observable, 该 Observable 对每个源 Observable 发出的值使用特定的累加器。
如果seed
值提供了, 这个值会被累加器用作初始值。 如果seed
值没有被提供, 源数据的第一项会被当做初始值。
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var ones = clicks.mapTo(1);
var seed = 0;
var count = ones.scan((acc, one) => acc + one, seed);
count.subscribe(x => console.log(x));
public sequenceEqual(compareTo: Observable, comparor: function): Observable source
使用可选的比较函数,按顺序比较两个 Observables 的所有值,然后返回单个布尔值的 Observable, 以表示两个序列是否相等。 按顺序检查两个 Observables 所发出的所有值是否相等。
sequenceEqual
订阅两个 observables 并且缓冲每个 observable 发出的值。 当任何一个 observable 发出数据, 该值会被缓冲
并且缓冲区从底部向上移动和比较; 如果任何一对值不匹配, 返回的 observable 会发出 false
和完成。 如果其中一个 observables 完
成了, 操作符会等待另一个 observable 完成; 如果另一个 observable 在完成之前又发出了数据, 返回 observable 会发出 false
和完
成。 如果其中一个 observable 永远不会完成或者在另一个完成后还发出数据, 返回的 observable 永远不会结束。
Params:
Name | Type | Attribute | Description |
compareTo | Observable | 用来与源 Observable 进行比较的 Observable 序列。 |
|
comparor | function |
|
用来比较每一对值的比较函数。 |
Example:
var code = Rx.Observable.from([
"ArrowUp",
"ArrowUp",
"ArrowDown",
"ArrowDown",
"ArrowLeft",
"ArrowRight",
"ArrowLeft",
"ArrowRight",
"KeyB",
"KeyA",
"Enter" // no start key, clearly.
]);
var keys = Rx.Observable.fromEvent(document, 'keyup')
.map(e => e.code);
var matches = keys.bufferCount(11, 1)
.mergeMap(
last11 =>
Rx.Observable.from(last11)
.sequenceEqual(code)
);
matches.subscribe(matched => console.log('Successful cheat at Contra? ', matched));
public share(): Observable<T> source
返回一个新的 Observable,该 Observable 多播(共享)源 Observable。 至少要有一个订阅者,该 Observable 才会被订阅并发出数据。
当所有的订阅者都取消订阅了,它会取消对源 Observable 的订阅。 因为 Observable 是多路传播的它使得流是 hot
。
它是 `.publish().refCount()` 的别名。
public shareReplay(bufferSize: *, windowTime: *, scheduler: *): * source
Params:
Name | Type | Attribute | Description |
bufferSize | * | ||
windowTime | * | ||
scheduler | * |
Return:
* |
public single(predicate: Function): Observable<T> source
该 Observable 发出源 Observable 所发出的值中匹配指定 predicate 函数的单个项。 如果源 Observable 发出多于1个数据项或者没有发出数据项, 分别以 IllegalArgumentException 和 NoSuchElementException 进行通知。
Params:
Name | Type | Attribute | Description |
predicate | Function | 断言函数,用来评估源 Observable 的数据项。 |
Throw:
如果 Observable 在发送任何 |
public skip(count: Number): Observable source
返回一个 Observable, 该 Observable 跳过源 Observable 发出的前N个值(N = count)。
Params:
Name | Type | Attribute | Description |
count | Number | 由源 Observable 所发出项应该被跳过的次数。 |
public skipLast(count: number): Observable<T> source
跳过源 Observable 最后发出的的N个值 (N = count)。
skipLast
返回一个 Observable,该 Observable 累积足够长的队列以存储最初的N个值 (N = count)。
当接收到更多值时,将从队列的前面取值并在结果序列上产生。 这种情况下值会被延时。
Params:
Name | Type | Attribute | Description |
count | number | 源 Observable 中从后往前要跳过的值的数量。 |
Throw:
当使用 |
Example:
var many = Rx.Observable.range(1, 5);
var skipLastTwo = many.skipLast(2);
skipLastTwo.subscribe(x => console.log(x));
// Results in:
// 1 2 3
public skipUntil(notifier: Observable): Observable<T> source
返回一个 Observable,该 Observable 会跳过源 Observable 发出的值直到第二个 Observable 开始发送。
Params:
Name | Type | Attribute | Description |
notifier | Observable | 第二个 Observable ,它发出后,结果 Observable 开始镜像源 Observable 的元素。 |
public skipWhile(predicate: Function): Observable<T> source
返回一个 Observable, 该 Observable 会跳过由源 Observable 发出的所有满足指定条件的数据项, 但是一旦出现了不满足条件的项,则发出在此之后的所有项。
Params:
Name | Type | Attribute | Description |
predicate | Function | 函数,用来测试源 Observable 发出的每个数据项。 |
public startWith(values: ...T, scheduler: Scheduler): Observable source
返回的 Observable 会先发出作为参数指定的项,然后再发出由源 Observable 所发出的项。
Params:
Name | Type | Attribute | Description |
values | ...T | 你希望修改过的 Observable 可以先发出的项。 |
|
scheduler | Scheduler |
|
用于调度 |
public subscribe(observerOrNext: Observer | Function, error: Function, complete: Function): ISubscription source
调用 Observable 的执行并注册 Observer 的处理器以便于发出通知。
当你拥有这些 Observables 却仍然什么也没发生时使用它。
subscribe
不是一个常规的操作符,而是方法,它调用 Observables 内部的 subscribe 函数。它也许是一个你传递给 create
静态工厂的方法,但是大多数情况下它是一个库的实现,它定义了 Observable 什么时候发出,发出什么。这意味着实际上是 Observable 开始工
作的那一刻才调用 subscribe , 而不是像人们经常认为的那样,即创建 Observable 的时候。
除了开始 Observable 的执行,该方法允许你监听 Observable 发出的值,也包括完成或者发生错误。你可以通过以下两种方式达到 这种目的。
第一种方式是创建一个实现了 Observer 接口的对象。它应该实现接口定义的方法,但是要注意的是它仅仅是一个普通的 JavaScript
对象,你可以用任何你想要的方式创建(ES6 class, 常见的构造函数, 对象字面量等等)。 特别地,不要尝试使用任何 RxJS 的实现细节去创建
Observers-你不需要这样做。 同样要记住,你的对象不需要实现所有的方法。如果你发现自己创建一个不做任何事情的方法,你可以简化它。
不过要注意,如果 error
方法没用被提供,所有的错误都不会被捕获。
第二种方式是放弃 Observer 对象,只需提供回调函数来替代它的方法。这意味你可以给 subscribe
提供3个方法作为参数, 第一个回调
等价于 next
方法,第二个等价于 error
方法,第三个等价于 complete
方法。就如同 Observer 一样,如果你不需要监听某中某个,你可以
省略该函数,通过传递 undefined
或者 null
,因为 subscribe
可以通过在函数调用中的位置识别了这些函数。提到 error
函数,正如上文所述,
如果没有提供,Observable 发出的错误会被抛弃。
不管你使用了哪种方式调用 subscribe
,所有情况都返回 Subscription 对象。该对象允许你调用 unsubscribe
,该方法会停止 Observable
的工作并且清理 Observable 持有的资源。注意,取消订阅不会调用 subscribe
提供的 complete
回调函数,complete
回调函数是为来自 Observable 的常规完成信号保留的。
记住,提供给 subscribe
的回调函数无法保证是被异步地调用。是 Observable 自身决定这些方法的执行。例如:of
默认同步地发出所有的值。经常查看文档以确认给定的 Observable 被订阅时的行为是怎样的,以及它的默认行为是否可以通过使用 Scheduler 进行更改。
Return:
ISubscription | 注册处理程序的订阅引用。 |
Example:
const sumObserver = {
sum: 0,
next(value) {
console.log('Adding: ' + value);
this.sum = this.sum + value;
},
error() { // We actually could just remote this method,
}, // since we do not really care about errors right now.
complete() {
console.log('Sum equals: ' + this.sum);
}
};
Rx.Observable.of(1, 2, 3) // 同步发出 1, 2, 3,然后完成。
.subscribe(sumObserver);
// 日志:
// "Adding: 1"
// "Adding: 2"
// "Adding: 3"
// "Sum equals: 6"
let sum = 0;
Rx.Observable.of(1, 2, 3)
.subscribe(
function(value) {
console.log('Adding: ' + value);
sum = sum + value;
},
undefined,
function() {
console.log('Sum equals: ' + sum);
}
);
// 日志:
// "Adding: 1"
// "Adding: 2"
// "Adding: 3"
// "Sum equals: 6"
const subscription = Rx.Observable.interval(1000).subscribe(
num => console.log(num),
undefined,
() => console.log('completed!') // 即使当取消订阅时,也不会被调用
);
setTimeout(() => {
subscription.unsubscribe();
console.log('unsubscribed!');
}, 2500);
// Logs:
// 0 after 1s
// 1 after 2s
// "unsubscribed!" after 2,5s
public subscribeOn(scheduler: Scheduler): Observable<T> source
使用指定的 IScheduler 异步地订阅此 Observable 的观察者。
Params:
Name | Type | Attribute | Description |
scheduler | Scheduler | 执行 subscription 操作的 IScheduler 。 |
public switch(): Observable<T> source
通过只订阅最新发出的内部 Observable ,将高阶 Observable 转换成一阶 Observable 。
一旦有新的内部 Observable 出现,通过丢弃前一个,将 高级 Observable 打平。
switch
订阅发出 Observables 的 Observable,也就是高阶 Observable 。
每次观察到这些已发出的内部 Observables 中的其中一个时,输出 Observable 订阅
这个内部 Observable 并开始发出该 Observable 所发出的项。到目前为止,
它的行为就像 mergeAll 。然而,当发出一个新的内部 Observable 时,
switch
会从先前发送的内部 Observable 那取消订阅,然后订阅新的内部 Observable
并开始发出它的值。后续的内部 Observables 也是如此。
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
// 每次点击事件都会映射成间隔1秒的 interval Observable
var higherOrder = clicks.map((ev) => Rx.Observable.interval(1000));
var switched = higherOrder.switch();
// 结果是 `switched` 本质上是一个每次点击时会重新启动的计时器。
// 之前点击产生的 interval Observables 不会与当前的合并。
switched.subscribe(x => console.log(x));
public switchMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable source
将每个源值投射成 Observable,该 Observable 会合并到输出 Observable 中, 并且只发出最新投射的 Observable 中的值。
将每个值映射成 Observable ,然后使用 switch 打平所有的内部 Observables 。
返回的 Observable 基于应用一个函数来发送项,该函数提供给源 Observable 发出的每个项,
并返回一个(所谓的“内部”) Observable 。每次观察到这些内部 Observables 的其中一个时,
输出 Observable 将开始发出该内部 Observable 所发出的项。当发出一个新的内部
Observable 时,switchMap
会停止发出先前发出的内部 Observable 并开始发出新的内部
Observable 的值。后续的内部 Observables 也是如此。
Params:
Name | Type | Attribute | Description |
project | function(value: T, ?index: number): ObservableInput | 函数, 当应用于源 Observable 发出的项时,返回一个 Observable 。 |
|
resultSelector | function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any |
|
函数,它用于产生基于值的输出 Observable 和源(外部)发送和内部 Observable 发送的索引。 传递给这个函数参数有:
|
Return:
Observable | 该 Observable 发出由源 Observable 发出的每项应用投射函数
(和可选的 |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.switchMap((ev) => Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));
public switchMapTo(innerObservable: ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable source
将每个源值投射成同一个 Observable ,该 Observable 会使用 switch 多次被打平 到输出 Observable 中。
它很像 switchMap,但永远将每个值映射到同一个内部
- Observable 。
将每个源值映射成给定的 Observable :innerObservable
,而无论源值是什么,然后
将这些结果 Observables 合并到单个的 Observable ,也就是输出 Observable 。
输出 Observables 只会发出 innerObservable
实例最新发出的值。
Params:
Name | Type | Attribute | Description |
innerObservable | ObservableInput | 用来替换源 Observable 中的每个值 的 Observable 。 |
|
resultSelector | function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any |
|
函数,它用于产生基于值的输出 Observable 和源(外部)发送和内部 Observable 发送的索引。 传递给这个函数参数有:
|
Return:
Observable | 每次源 Observable 发出值时,该 Observable 发出来自
给定 |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.switchMapTo(Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));
public take(count: number): Observable<T> source
只发出源 Observable 最初发出的的N个值 (N = count
)。
接收源 Observable 最初的N个值 (N = count
),然后完成。
take
返回的 Observable 只发出源 Observable 最初发出的的N个值 (N = count
)。
如果源发出值的数量小于 count
的话,那么它的所有值都将发出。然后它便完成,无论源
Observable 是否完成。
Params:
Name | Type | Attribute | Description |
count | number | 发出 |
Return:
Observable<T> | 该 Observable 只发出源 Observable 最初发出的的N个值 (N = |
Throw:
当使用 |
Example:
var interval = Rx.Observable.interval(1000);
var five = interval.take(5);
five.subscribe(x => console.log(x));
public takeLast(count: number): Observable<T> source
只发出源 Observable 最后发出的的N个值 (N = count
)。
记住源 Observable 的最后N个值 (N = count
),然后只有当
它完成时发出这些值。
takeLast
返回的 Observable 只发出源 Observable 最后发出的的N个值 (N = count
)。
如果源发出值的数量小于 count
的话,那么它的所有值都将发出。此操作符必须等待
源 Observable 的 complete
通知发送才能在输出 Observable 上发出 next
值,
因为不这样的话它无法知道源 Observable 上是否还有更多值要发出。出于这个原因,
所有值都将同步发出,然后是 complete
通知。
Params:
Name | Type | Attribute | Description |
count | number | 从源 Observable 的值序列的末尾处,要发出的值的最大数量。 |
Return:
Observable<T> | 该 Observable 只发出源 Observable 最后发出的的N个值 (N = |
Throw:
当使用 |
Example:
var many = Rx.Observable.range(1, 100);
var lastThree = many.takeLast(3);
lastThree.subscribe(x => console.log(x));
public takeUntil(notifier: Observable): Observable<T> source
发出源 Observable 发出的值,直到 notifier
Observable 发出值。
它发出源 Observable 的值,然后直到第二个 Observable (即 notifier )发出项,它便完成。
takeUntil
订阅并开始镜像源 Observable 。它还监视另外一个 Observable,即你
提供的 notifier
。如果 notifier
发出值或 complete
通知,那么输出 Observable
停止镜像源 Observable ,然后完成。
Params:
Name | Type | Attribute | Description |
notifier | Observable | 该 Observable 第一次发出值会使 |
Example:
var interval = Rx.Observable.interval(1000);
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = interval.takeUntil(clicks);
result.subscribe(x => console.log(x));
public takeWhile(predicate: function(value: T, index: number): boolean): Observable<T> source
发出在源 Observable 中满足 predicate
函数的每个值,并且一旦出现不满足 predicate
的值就立即完成。
只要当通过给定的条件时才接收源 Observable 的值。 当第一个不满足条件的值出现时,它便完成。
takeWhile
订阅并开始镜像源 Observable 。每个源 Observable 发出的值都会传给
predicate
函数,它会返回源值是否满足条件的布尔值。输出 Observable 会发出源值,
直到某个时间点 predicate
返回了 false,此时 takeWhile
会停止镜像源 Observable
并且完成输出 Observable 。
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.takeWhile(ev => ev.clientX > 200);
result.subscribe(x => console.log(x));
public throttle(durationSelector: function(value: T): SubscribableOrPromise, config: Object): Observable<T> source
从源 Observable 中发出一个值,然后在由另一个 Observable 决定的期间内忽略 随后发出的源值,然后重复此过程。
它很像 throttleTime,但是沉默持续时间是由 第二个 Observable 决定的。
当 throttle
的内部定时器禁用时,它会在输出 Observable 上发出源 Observable 的值,
并当定时器启用时忽略源值。最开始时,定时器是禁用的。一旦第一个源值达到,它会被转发
到输出 Observable ,然后通过使用源值调用 durationSelector
函数来启动定时器,这
个函数返回 "duration" Observable 。当 duration Observable 发出值或完成时,定时器
会被禁用,并且下一个源值也是重复此过程。
Params:
Name | Type | Attribute | Description |
durationSelector | function(value: T): SubscribableOrPromise | 该函数 从源 Observable 中接收值,用于为每个源值计算沉默持续时间,并返回 Observable 或 Promise 。 |
|
config | Object | 用来定义 |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.throttle(ev => Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));
public throttleTime(duration: number, scheduler: Scheduler): Observable<T> source
从源 Observable 中发出一个值,然后在 duration
毫秒内忽略随后发出的源值,
然后重复此过程。
让一个值通过,然后在接下来的 duration
毫秒内忽略源值。
当 throttle
的内部定时器禁用时,它会在输出 Observable 上发出源 Observable 的值,
并当定时器启用时忽略源值。最开始时,定时器是禁用的。一旦第一个源值达到,它会被转发
到输出 Observable ,然后启动定时器。在 duration
毫秒(或由可选的 scheduler
内部确定的时间单位)后,定时器会被禁用,并且下一个源值也是重复此过程。可选择性地
接收一个 IScheduler 用来管理定时器。
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.throttleTime(1000);
result.subscribe(x => console.log(x));
public timeInterval(scheduler: *): Observable<TimeInterval<any>> | WebSocketSubject<T> | Observable<T> source
Params:
Name | Type | Attribute | Description |
scheduler | * |
public timeout(due: number, scheduler: Scheduler): Observable<R> | WebSocketSubject<T> | Observable<T> source
public timeoutWith(due: *, withObservable: *, scheduler: *): Observable<R> | WebSocketSubject<T> | Observable<T> source
Params:
Name | Type | Attribute | Description |
due | * | ||
withObservable | * | ||
scheduler | * |
public timestamp(scheduler: *): Observable<Timestamp<any>> | WebSocketSubject<T> | Observable<T> source
Params:
Name | Type | Attribute | Description |
scheduler | * |
public toArray(): Observable<any[]> | WebSocketSubject<T> | Observable<T> source
public toPromise(PromiseCtor: PromiseConstructor): Promise<T> source
将 Observable 序列转换为符合 ES2015 标准的 Promise 。
Params:
Name | Type | Attribute | Description |
PromiseCtor | PromiseConstructor |
|
Promise 的构造函数。如果没有提供的话,
它首先会在 |
Example:
// 使用普通的 ES2015
let source = Rx.Observable
.of(42)
.toPromise();
source.then((value) => console.log('Value: %s', value));
// => Value: 42
// 被拒的 Promise
// 使用标准的 ES2015
let source = Rx.Observable
.throw(new Error('woops'))
.toPromise();
source
.then((value) => console.log('Value: %s', value))
.catch((err) => console.log('Error: %s', err));
// => Error: Error: woops
// 通过 config 进行设置
Rx.config.Promise = RSVP.Promise;
let source = Rx.Observable
.of(42)
.toPromise();
source.then((value) => console.log('Value: %s', value));
// => Value: 42
// 通过方法进行设置
let source = Rx.Observable
.of(42)
.toPromise(RSVP.Promise);
source.then((value) => console.log('Value: %s', value));
// => Value: 42
public window(windowBoundaries: Observable<any>): Observable<Observable<T>> source
每当 windowBoundaries
发出项时,将源 Observable 的值分支成嵌套的 Observable 。
就像是 buffer, 但发出的是嵌套的 Observable ,而不是数组。
返回的 Observable 发出从源 Observable 收集到的项的窗口。 输出 Observable 发出连接的,不重叠的
窗口. 当windowBoundaries
Observable 开始发出数据,它会发出目前的窗口并且会打开一个新的。
因为每个窗口都是 Observable, 所以输出 Observable 是高阶 Observable。
Params:
Name | Type | Attribute | Description |
windowBoundaries | Observable<any> | 完成上一个窗口并且开启新窗口的 Observable。 |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var interval = Rx.Observable.interval(1000);
var result = clicks.window(interval)
.map(win => win.take(2)) // 每个窗口最多两个发送
.mergeAll(); // 打平高阶 Observable
result.subscribe(x => console.log(x));
See:
- windowCount
- windowTime
- windowToggle
- windowWhen
- buffer
public windowCount(windowSize: number, startWindowEvery: number): Observable<Observable<T>> source
将源 Observable 的值分支成多个嵌套的 Observable ,每个嵌套的 Observable 最多发出 windowSize 个值。
就像是 bufferCount, 但是返回嵌套的 Observable 而不是数组。
返回的 Observable 发出从源 Observable 收集到的项的窗口。 输出 Observable 每M(M = startWindowEvery)个项发出新窗口,每个窗口包含的项数不得超过N个(N = windowSize)。 当源 Observable 完成或者遇到错误,输出 Observable 发出当前窗口并且传播从源 Observable 收到的通知。 如果没有提供 startWindowEvery ,那么在源 Observable 的起始处立即开启新窗口,并且当每个窗口的大小达到 windowSize 时完成。
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.windowCount(3)
.map(win => win.skip(1)) // 跳过每三个点击中的第一个
.mergeAll(); // 打平高阶 Observable
result.subscribe(x => console.log(x));
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.windowCount(2, 3)
.mergeAll(); // 打平高阶 Observable
result.subscribe(x => console.log(x));
See:
- window
- windowTime
- windowToggle
- windowWhen
- bufferCount
public windowToggle(openings: Observable<O>, closingSelector: function(value: O): Observable): Observable<Observable<T>> source
将源 Observable 的值分支成嵌套的 Observable,分支策略是以 openings 发出项为起始,以 closingSelector 发出为结束。
就像是 bufferToggle, 但是发出的是嵌套 Observable 而不是数组。
返回的 Observable 发出从源 Observable 收集到的项的窗口。输出 Observable 发出窗口 ,每一个窗口
包括当 openings
发出时开始收集源 Observable 的数据项并且 closingSelector
返回的 Observable 发出项时结束收集。
Params:
Name | Type | Attribute | Description |
openings | Observable<O> | 通知开启新窗口的 observable。 |
|
closingSelector | function(value: O): Observable | 是一个接受 |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var openings = Rx.Observable.interval(1000);
var result = clicks.windowToggle(openings, i =>
i % 2 ? Rx.Observable.interval(500) : Rx.Observable.empty()
).mergeAll();
result.subscribe(x => console.log(x));
See:
- window
- windowCount
- windowTime
- windowWhen
- bufferToggle
public windowWhen(closingSelector: function(): Observable): Observable<Observable<T>> source
将源 Observable 的值分支成嵌套的 Observable ,通过使用关闭 Observable 的工厂函数来决定何时开启新的窗口。
就像是 bufferWhen, 但是发出的是嵌套的 Observable 而不是数组。
返回的 Observable 发出从源 Observable 收集到的项的窗口。 输出 Observable 发出连接的,非重叠的窗口。 每当由指定的 closingSelector 函数产生的 Observable 发出项,它会发出当前窗口并开启一个新窗口。 当输出 Observable 被订阅的时候立马开启第一个窗口。
Params:
Name | Type | Attribute | Description |
closingSelector | function(): Observable | 函数,不接受参数并且返回 Observable,
该 Observable 发出信号( |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks
.windowWhen(() => Rx.Observable.interval(1000 + Math.random() * 4000))
.map(win => win.take(2)) // 每个窗口最多两个发送
.mergeAll(); // 打平高阶Observable
result.subscribe(x => console.log(x));
See:
- window
- windowCount
- windowTime
- windowToggle
- bufferWhen
public withLatestFrom(other: ObservableInput, project: Function): Observable source
结合源 Observable 和另外的 Observables 以创建新的 Observable, 该 Observable 的值由每 个 Observable 最新的值计算得出,当且仅当源发出的时候。
每当源 Observable 发出值,它会计算一个公式,此公式使用该值加上其他输入 Observable 的最新值,然后发出公式的输出结果。
withLatestFrom
结合源 Observablecombines(实例)和其他输入 Observables 的最新值,当且仅当
source 发出数据时, 可选的使用 project
函数以决定输出 Observable 将要发出的值。
在输出 Observable 发出值之前,所有的输入 Observables 都必须发出至少一个值。
Params:
Name | Type | Attribute | Description |
other | ObservableInput | 输入 Observable ,用来和源 Observable 结合。 可以传入多个输入 Observables。 |
|
project | Function |
|
将多个值合并的投射函数。顺序地接受所有 Observables 传入的值,第一个参数是源 Observable
的值。 ( |
Example:
var clicks = Rx.Observable.fromEvent(document, 'click');
var timer = Rx.Observable.interval(1000);
var result = clicks.withLatestFrom(timer);
result.subscribe(x => console.log(x));
See:
public zipAll(project: *): Observable<R> | WebSocketSubject<T> | Observable<T> source
Params:
Name | Type | Attribute | Description |
project | * |
public zipProto(observables: *): Observable<R> source
Params:
Name | Type | Attribute | Description |
observables | * |