Home Manual Reference Source Test Repository
import {Observable} from '@reactivex/rxjs/es6/Observable.js'
public class | source

Observable

Indirect Subclass:

AnonymousSubject, AsyncSubject, BehaviorSubject, es6/operator/windowTime.js~CountedSubject, ReplaySubject

表示在任意时间内的任意一组值。 这是 RxJS 最基本的构建块。

Test:

Static Method Summary

Static Public Methods
public static

bindCallback(func: function, selector: function, scheduler: Scheduler): function(...params: *): Observable

把回调API转化为返回Observable的函数。

public static

bindNodeCallback(func: function, selector: function, scheduler: Scheduler): *

把 Node.js 式回调API转换为返回 Observable 的函数。

public static

combineLatest(observable1: ObservableInput, observable2: ObservableInput, project: function, scheduler: Scheduler): Observable

组合多个 Observables 来创建一个 Observable ,该 Observable 的值根据每个输入 Observable 的最新值计算得出的。

public static

concat(input1: ObservableInput, input2: ObservableInput, scheduler: Scheduler): Observable

创建一个输出 Observable,该 Observable 顺序的发出每个输入 Observable 的所有值。

public static

create(onSubscription: function(observer: Observer): TeardownLogic): Observable

创建一个新的 Observable ,当观察者( Observer )订阅该 Observable 时,它会执行指定的函数。

public static

创建一个 Observable,当被订阅的时候,调用 Observable 工厂为每个观察者创建新的 Observable。

public static

empty(scheduler: Scheduler): Observable

创建一个什么数据都不发出并且立马完成的 Observable。

public static

forkJoin(sources: *): any

public static

from(ish: ObservableInput<T>, scheduler: Scheduler): Observable<T>

从一个数组、类数组对象、Promise、迭代器对象或者类 Observable 对象创建一个 Observable.

public static

fromEvent(target: EventTargetLike, eventName: string, options: EventListenerOptions, selector: SelectorMethodSignature<T>): Observable<T>

创建一个 Observable,该 Observable 发出来自给定事件对象的指定类型事件。

public static

fromEventPattern(addHandler: function(handler: Function): any, removeHandler: function(handler: Function, signal?: any): void, selector: function(...args: any): T): Observable<T>

从一个基于 addHandler/removeHandler 方法的API创建 Observable。

public static

fromPromise(promise: PromiseLike<T>, scheduler: Scheduler): Observable<T>

将 Promise 转化为 Observable。

public static

interval(period: number, scheduler: Scheduler): Observable

创建一个 Observable ,该 Observable 使用指定的 IScheduler ,并以指定时间间隔发出连续的数字。

public static

merge(observables: ...ObservableInput, concurrent: number, scheduler: Scheduler): Observable

创建一个输出 Observable ,它可以同时发出每个给定的输入 Observable 中值。

public static

创建一个不向观察者发出任何项的 Observable 。

public static

of(values: ...T, scheduler: Scheduler): Observable<T>

创建一个 Observable,它会依次发出由你提供的参数,最后发出完成通知。 发出你提供的参数,然后完成。

public static

range(start: number, count: number, scheduler: Scheduler): Observable

创建一个 Observable ,它发出指定范围内的数字序列。

public static

throw(将具体的: any, scheduler: Scheduler): Observable

创建一个不发送数据给观察者并且立马发出错误通知的 Observable。

public static

timer(initialDelay: number | Date, period: number, scheduler: Scheduler): Observable

创建一个 Observable,该 Observable 在初始延时(initialDelay)之后开始发送并且在每个时间周期( period)后发出自增的数字。

public static

webSocket(urlConfigOrSource: string | WebSocketSubjectConfig): WebSocketSubject

包装浏览器提供的兼容w3c的WebSocket对象.

public static

zip(observables: *): Observable<R>

将多个 Observable 组合以创建一个 Observable,该 Observable 的值是由所有输入 Observables 的值按顺序计算而来的。

Constructor Summary

Public Constructor
public

constructor(subscribe: Function)

Method Summary

Public Methods
public

An interop point defined by the es7-observable spec https://github.com/zenparsing/es-observable

public

audit(durationSelector: function(value: T): SubscribableOrPromise): Observable<T>

在另一个 Observable 决定的时间段里忽略源数据,然后发出源 Observable 中最新发出的值, 然后重复此过程。

public

auditTime(duration: number, scheduler: Scheduler): Observable<T>

duration 毫秒内忽略源值,然后发出源 Observable 的最新值, 并且重复此过程。

public

buffer(closingNotifier: Observable<any>): Observable<T[]>

缓冲源 Observable 的值直到 closingNotifier 发出。

public

bufferCount(bufferSize: number, startBufferEvery: number): Observable<T[]>

缓冲源 Observable 的值直到缓冲数量到达设定的 bufferSize.

public

bufferTime(bufferTimeSpan: number, bufferCreationInterval: number, maxBufferSize: number, scheduler: Scheduler): Observable<T[]>

在特定时间周期内缓冲源 Observable 的值。

public

bufferToggle(openings: SubscribableOrPromise<O>, closingSelector: function(value: O): SubscribableOrPromise): Observable<T[]>

缓冲源 Observable 的值,openings 发送的时候开始缓冲,closingSelector 发送的时候结束缓冲。

public

bufferWhen(closingSelector: function(): Observable): Observable<T[]>

缓冲源 Observable 的值, 使用关闭 Observable 的工厂函数来决定何时关闭、发出和重置缓冲区。

public

catch(selector: function): Observable

捕获 observable 中的错误,可以通过返回一个新的 observable 或者抛出错误对象来处理。

public

通过等待外部 Observable 完成然后应用 combineLatest ,将高阶 Observable 转化为一阶 Observable。

public

组合多个 Observables 来创建一个 Observable ,该 Observable 的值根据每个输入 Observable 的最新值计算得出的。

public

创建一个输出 Observable,它在当前 Observable 之后顺序地发出每个给定的输入 Observable 中的所有值。

public

通过顺序地连接内部 Observable,将高阶 Observable 转化为一阶 Observable 。

public

concatMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable

将源值投射为一个合并到输出 Observable 的 Observable,以串行的方式等待前一个完成再合并下一个 Observable。

public

concatMapTo(innerObservable: ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable

将每个源值投射成同一个 Observable ,该 Observable 会以串行的方式多次合并到输出 Observable 中 。

public

count(predicate: function(value: T, i: number, source: Observable<T>): boolean): Observable

计算源的发送数量,并当源完成时发出该数值。

public

debounce(durationSelector: function(value: T): SubscribableOrPromise): Observable

只有在另一个 Observable 决定的一段特定时间经过后并且没有发出另一个源值之后,才从源 Observable 中发出一个值。

public

debounceTime(dueTime: number, scheduler: Scheduler): Observable

只有在特定的一段时间经过后并且没有发出另一个源值,才从源 Observable 中发出一个值。

public

defaultIfEmpty(defaultValue: any): Observable

如果源 Observable 在完成之前没有发出任何 next 值,则发出给定的值,否则返回 Observable 的镜像。

public

delay(delay: number | Date, scheduler: Scheduler): Observable

通过给定的超时或者直到一个给定的时间来延迟源 Observable 的发送。

public

delayWhen(delayDurationSelector: function(value: T): Observable, subscriptionDelay: Observable): Observable

在给定的时间范围内,延迟源 Observable 所有数据项的发送,该时间段由另一个 Observable 的发送决定。

public

Notification 对象的 Observable 转换成它们所代表的发送。

public

distinct(keySelector: function, flushes: Observable): Observable

返回 Observable,它发出由源 Observable 所发出的所有与之前的项都不相同的项。

public

返回 Observable,它发出源 Observable 发出的所有与前一项不相同的项。

public

返回 Observable,它发出源 Observable 发出的所有与前一项不相同的项,使用通过提供的 key 访问到的属性来检查两个项是否不同。

public

do(nextOrObserver: Observer | function, error: function, complete: function): Observable

为源 Observable 上的每次发送执行副作用,但返回的 Observable 与源 Observable 是相同的。

public

elementAt(index: number, defaultValue: T): Observable

只发出单个值,这个值位于源 Observable 的发送序列中的指定 index 处。

public

every(predicate: function, thisArg: any): Observable

返回的 Observable 发出是否源 Observable 的每项都满足指定的条件。

public

当前一个内部 Observable 还未完成的情况下,通过丢弃内部 Observable 使得 高阶 Observable 转换成一阶 Observable。

public

exhaustMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable

将每个源值投射成 Observable,只有当前一个投射的 Observable 已经完成, 这个 Observable 才会被合并到输出 Observable 中。

public

expand(project: function(value: T, index: number), concurrent: number): Observable

递归地将每个源值投射成 Observable,这个 Observable 会被合并到输出 Observable 中。

public

filter(predicate: function(value: T, index: number): boolean, thisArg: any): Observable

通过只发送源 Observable 的中满足指定 predicate 函数的项来进行过滤。

public

find(predicate: function(value: T, index: number, source: Observable<T>): boolean, thisArg: any): Observable<T>

只发出源 Observable 所发出的值中第一个满足条件的值。

public

findIndex(predicate: function(value: T, index: number, source: Observable<T>): boolean, thisArg: any): Observable<T>

只发出源 Observable 所发出的值中第一个满足条件的值的索引。

public

first(predicate: function(value: T, index: number, source: Observable<T>): boolean, resultSelector: function(value: T, index: number): R, defaultValue: R): Observable<T | R>

只发出由源 Observable 所发出的值中第一个(或第一个满足条件的值)。

public

forEach(next: Function, PromiseCtor: PromiseConstructor): Promise

public

groupBy(keySelector: function(value: T): K, elementSelector: function(value: T): R, durationSelector: function(grouped: GroupedObservable<K, R>): Observable<any>): Observable<GroupedObservable<K, R>>

根据指定条件将源 Observable 发出的值进行分组,并将这些分组作为 GroupedObservables 发出,每一个分组都是一个 GroupedObservable

public

忽略源 Observable 所发送的所有项,只传递 completeerror 的调用。

public

如果源 Observable 是空的话,它返回一个发出 true 的 Observable,否则发出 false 。

public

last(predicate: function): Observable

返回的 Observable 只发出由源 Observable 发出的最后一个值。它可以接收一个可选的 predicate 函数作为 参数,如果传入 predicate 的话则发送的不是源 Observable 的最后一项,而是发出源 Observable 中 满足 predicate 函数的最后一项。

public

letProto(func: *): Observable<R>

public

lift(operator: Operator): Observable

创建一个新的 Observable,以它作为源,并传递操作符的定义作为新的 observable 操作符。

public

map(project: function(value: T, index: number): R, thisArg: any): Observable<R>

将给定的 project 函数应用于源 Observable 发出的每个值,并将结果值作为 Observable 发出。

public

mapTo(value: any): Observable

每次源 Observble 发出值时,都在输出 Observable 上发出给定的常量值。

public

表示源 Observable 中的所有通知,每个通知都会在 Notification 对象中标记为 它们原始的通知类型,并会作为输出 Observable 的 next 通知。

public

max(comparer: Function): Observable

max 操作符操作的 Observable 发出数字(或可以与提供的函数进行比较的项)并且当源 Observable 完成时它发出单一项:最大值的项。

public

merge(other: ObservableInput, concurrent: number, scheduler: Scheduler): Observable

创建一个输出 Observable ,它可以同时发出每个给定的输入 Observable 中的所有值。

public

mergeAll(concurrent: number): Observable

将高阶 Observable 转换成一阶 Observable ,一阶 Observable 会同时发出在内部 Observables 上发出的所有值。

public

mergeMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable

将每个源值投射成 Observable ,该 Observable 会合并到输出 Observable 中。

public

mergeMapTo(innerObservable: ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable

将每个源值投射成同一个 Observable ,该 Observable 会多次合并到输出 Observable 中。

public

mergeScan(accumulator: function(acc: R, value: T): Observable<R>, seed: *, concurrent: number): Observable<R>

在源 Observable 上应用 accumulator 函数,其中 accumulator 函数本身返回 Observable ,然后每个返回的中间 Observable 会被合并到输出 Observable 中。

public

min(comparer: Function): Observable<R>

min 操作符操作的 Observable 发出数字(或可以使用提供函数进行比较的项)并且当源 Observable 完成时它发出单一项:最小值的项。

public

multicast(subjectOrSubjectFactory: Function | Subject, selector: Function): Observable

返回的 Observable 发出对 ConnectableObservable 发出的项调用一个指定的 selector 函数的结果, ConnectableObservable 可以在潜在的多个流之间共享单个 subscription 。

public

observeOn(scheduler: IScheduler, delay: number): Observable<T>

使用指定的调度器来重新发出源 Observable 的所有通知。

public

当任何提供的 Observable 发出完成或错误通知时,它会立即地订阅已传入下一个 Observable 。

public

将一系列连续的发送成对的组合在一起,并将这些分组作为两个值的数组发出。

public

partition(predicate: function(value: T, index: number): boolean, thisArg: any): [Observable<T>, Observable<T>]

将源 Observable 一分为二,一个是所有满足 predicate 函数的值,另一个是所有 不满足 predicate 的值。

public

pluck(properties: ...string): Observable

将每个源值(对象)映射成它指定的嵌套属性。

public

publish(selector: Function): *

返回 ConnectableObservable,它是 Observable 的变种,它会一直等待,直到 connnect 方法被调用才会开始把值发送给那些订阅它的观察者。

public
public
public

publishReplay(bufferSize: *, windowTime: *, scheduler: *): ConnectableObservable<T>

public

返回 Observable,该 Observable 是源 Observable 和提供的 Observables 的组合中 第一个发出项的 Observable 的镜像。

public

reduce(accumulator: function(acc: R, value: T, index: number): R, seed: R): Observable<R>

在源 Observalbe 上应用 accumulator (累加器) 函数,然后当源 Observable 完成时,返回 累加的结果,可以提供一个可选的 seed 值。

public

返回的 Observable 重复由源 Observable 所发出的项的流,最多可以重复 count 次。

public

repeatWhen(notifier: function(notifications: Observable): Observable): Observable

返回的 Observalbe 是源 Observable 的镜像,除了 complete 。如果源 Observable 调用了 complete,这个方法会发出给 notifier 返回的 Observable 。如果这个 Observale 调用了 completeerror,那么这个方法会在子 subscription 上调用 completeerror 。否则,此方法将重新订阅源 Observable。

public

返回一个 Observable, 该 Observable 是源 Observable 不包含错误异常的镜像。 如果源 Observable 发生错误, 这个方法不会传播错误而是会不 断的重新订阅源 Observable 直到达到最大重试次数 (由数字参数指定)。

public

返回一个 Observable, 该 Observable 是源 Observable 不包含错误异常的镜像。 如果源头 Observable 触发 error, 这个方法会发出引起错误的 Throwable 给 notifier 返回的 Observable。 如果该 Observable 触发 complete 或者 error 则该方法会使子订阅触发 completeerror。 否则该方法会重新订阅源 Observable。

public

sample(notifier: Observable<any>): Observable<T>

发出源 Observable 最新发出的值当另一个 notifier Observable发送时。

public

sampleTime(period: number, scheduler: Scheduler): Observable<T>

在周期时间间隔内发出源 Observable 发出的最新值。

public

scan(accumulator: function(acc: R, value: T, index: number): R, seed: T | R): Observable<R>

对源 Observable 使用累加器函数, 返回生成的中间值, 可选的初始值。

public

sequenceEqual(compareTo: Observable, comparor: function): Observable

使用可选的比较函数,按顺序比较两个 Observables 的所有值,然后返回单个布尔值的 Observable, 以表示两个序列是否相等。 按顺序检查两个 Observables 所发出的所有值是否相等。

public

返回一个新的 Observable,该 Observable 多播(共享)源 Observable。 至少要有一个订阅者,该 Observable 才会被订阅并发出数据。 当所有的订阅者都取消订阅了,它会取消对源 Observable 的订阅。 因为 Observable 是多路传播的它使得流是 hot。 它是 `.publish().refCount()` 的别名。

public

shareReplay(bufferSize: *, windowTime: *, scheduler: *): *

public

single(predicate: Function): Observable<T>

该 Observable 发出源 Observable 所发出的值中匹配指定 predicate 函数的单个项。 如果源 Observable 发出多于1个数据项或者没有发出数据项, 分别以 IllegalArgumentException 和 NoSuchElementException 进行通知。

public

返回一个 Observable, 该 Observable 跳过源 Observable 发出的前N个值(N = count)。

public

skipLast(count: number): Observable<T>

跳过源 Observable 最后发出的的N个值 (N = count)。

public

返回一个 Observable,该 Observable 会跳过源 Observable 发出的值直到第二个 Observable 开始发送。

public

skipWhile(predicate: Function): Observable<T>

返回一个 Observable, 该 Observable 会跳过由源 Observable 发出的所有满足指定条件的数据项, 但是一旦出现了不满足条件的项,则发出在此之后的所有项。

public

startWith(values: ...T, scheduler: Scheduler): Observable

返回的 Observable 会先发出作为参数指定的项,然后再发出由源 Observable 所发出的项。

public

subscribe(observerOrNext: Observer | Function, error: Function, complete: Function): ISubscription

调用 Observable 的执行并注册 Observer 的处理器以便于发出通知。

public

subscribeOn(scheduler: Scheduler): Observable<T>

使用指定的 IScheduler 异步地订阅此 Observable 的观察者。

public

通过只订阅最新发出的内部 Observable ,将高阶 Observable 转换成一阶 Observable 。

public

switchMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable

将每个源值投射成 Observable,该 Observable 会合并到输出 Observable 中, 并且只发出最新投射的 Observable 中的值。

public

switchMapTo(innerObservable: ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable

将每个源值投射成同一个 Observable ,该 Observable 会使用 switch 多次被打平 到输出 Observable 中。

public

take(count: number): Observable<T>

只发出源 Observable 最初发出的的N个值 (N = count)。

public

takeLast(count: number): Observable<T>

只发出源 Observable 最后发出的的N个值 (N = count)。

public

发出源 Observable 发出的值,直到 notifier Observable 发出值。

public

takeWhile(predicate: function(value: T, index: number): boolean): Observable<T>

发出在源 Observable 中满足 predicate 函数的每个值,并且一旦出现不满足 predicate 的值就立即完成。

public

throttle(durationSelector: function(value: T): SubscribableOrPromise, config: Object): Observable<T>

从源 Observable 中发出一个值,然后在由另一个 Observable 决定的期间内忽略 随后发出的源值,然后重复此过程。

public

throttleTime(duration: number, scheduler: Scheduler): Observable<T>

从源 Observable 中发出一个值,然后在 duration 毫秒内忽略随后发出的源值, 然后重复此过程。

public

timeInterval(scheduler: *): Observable<TimeInterval<any>> | WebSocketSubject<T> | Observable<T>

public

timeout(due: number, scheduler: Scheduler): Observable<R> | WebSocketSubject<T> | Observable<T>

public

timeoutWith(due: *, withObservable: *, scheduler: *): Observable<R> | WebSocketSubject<T> | Observable<T>

public

timestamp(scheduler: *): Observable<Timestamp<any>> | WebSocketSubject<T> | Observable<T>

public
public

toPromise(PromiseCtor: PromiseConstructor): Promise<T>

将 Observable 序列转换为符合 ES2015 标准的 Promise 。

public

window(windowBoundaries: Observable<any>): Observable<Observable<T>>

每当 windowBoundaries 发出项时,将源 Observable 的值分支成嵌套的 Observable 。

public

windowCount(windowSize: number, startWindowEvery: number): Observable<Observable<T>>

将源 Observable 的值分支成多个嵌套的 Observable ,每个嵌套的 Observable 最多发出 windowSize 个值。

public

windowToggle(openings: Observable<O>, closingSelector: function(value: O): Observable): Observable<Observable<T>>

将源 Observable 的值分支成嵌套的 Observable,分支策略是以 openings 发出项为起始,以 closingSelector 发出为结束。

public

windowWhen(closingSelector: function(): Observable): Observable<Observable<T>>

将源 Observable 的值分支成嵌套的 Observable ,通过使用关闭 Observable 的工厂函数来决定何时开启新的窗口。

public

结合源 Observable 和另外的 Observables 以创建新的 Observable, 该 Observable 的值由每 个 Observable 最新的值计算得出,当且仅当源发出的时候。

public

zipAll(project: *): Observable<R> | WebSocketSubject<T> | Observable<T>

public

zipProto(observables: *): Observable<R>

Static Public Methods

public static bindCallback(func: function, selector: function, scheduler: Scheduler): function(...params: *): Observable source

把回调API转化为返回Observable的函数。

给它一个签名为f(x, callback)的函数f,返回一个函数g, 调用'g(x)'的时候会返回一个 Observable。

bindCallback 并不是一个操作符,因为它的输入和输出并不是 Observable 。输入的是一个 带有多个参数的函数,并且该函数的最后一个参数必须是个回调函数,当该函数执行完之后会调用回调函数。

bindCallback 的输出是一个函数,该函数接受的参数和输入函数一样(除了没有最后一个回调函 数)。当输出函数被调用,会返回一个 Observable 。如果输入函数给回调函数传递一个值,则该 Observable 会发出这个值。如果输入函数给回调函数传递多个值,则该 Observable 会发出一个包含所有值的数组。

很重要的一点是,输出函数返回的 Observable 被订阅之前,输入函数是不会执行的。这意味着如果输入 函数发起 AJAX 请求,那么该请求在每次订阅返回的 Observable 之后才会发出,而不是之前。

作为一个可选项,selector 函数可以传给bindObservable。该函数接受和回调一样的参数。返回 Observable 发出的值,而不是回调参数本身,即使在默认情况下,传递给回调的多个参数将在流中显示为数组。选择器 函数直接用参数调用,就像回调一样。这意味着你可以想象默认选择器(当没有显示提供的时候)是这样 一个函数:将它的所有参数聚集到数组中,或者仅仅返回第一个参数(当只有一个参数的时候)。

最后一个可选参数 - Scheduler - 当 Observable 被订阅的时候,可以用来控制调用输入函 数以及发出结果的时机。默认订阅 Observable 后调用输入函数是同步的,但是使用Scheduler.async 作为最后一个参数将会延迟输入函数的调用,就像是用0毫秒的setTimeout包装过。所以如果你使用了异 步调度器并且订阅了 Observable ,当前正在执行的所有函数调用,将在调用“输入函数”之前结束。

当涉及到传递给回调的结果时,默认情况下当输入函数调用回调之后会立马发出,特别是如果回调也是同步调动的话, 那么 Observable 的订阅也会同步调用next方法。如果你想延迟调用,使用Scheduler.async。 这意味着通过使用Scheduler.async,你可以确保输入函数永远异步调用回调函数,从而避免了可怕的Zalgo。

需要注意的是,输出函数返回的Observable只能发出一次然后完成。即使输入函数多次调用回调函数,第二次 以及之后的调用都不会出现在流中。如果你需要监听多次的调用,你大概需要使用fromEvent或者 fromEventPattern来代替。

如果输入函数依赖上下文(this),该上下文将被设置为输出函数在调用时的同一上下文。特别是如果输入函数 被当作是某个对象的方法进行调用,为了保持同样的行为,建议将输出函数的上下文设置为该对象,输入方法不 是已经绑定好的。

如果输入函数以 node 的方式(第一个参数是可选的错误参数用来标示调用是否成功)调用回调函数,bindNodeCallback 提供了方便的错误处理,也许是更好的选择。 bindCallback 不会区别对待这些方法,错误参数(是否传递) 被解释成正常的参数。

Params:

NameTypeAttributeDescription
func function

最后一个参数是回调的函数。

selector function
  • optional

选择器,从回调函数中获取参数并将这些映射为一个 Observable 发出的值。

scheduler Scheduler
  • optional

调度器,调度回调函数。

Return:

function(...params: *): Observable

一个返回Observable的函数,该Observable发出回调函数返回的数据。

Example:

把jQuery的getJSON方法转化为Observable API
// 假设我们有这个方法:jQuery.getJSON('/my/url', callback)
var getJSONAsObservable = Rx.Observable.bindCallback(jQuery.getJSON);
var result = getJSONAsObservable('/my/url');
result.subscribe(x => console.log(x), e => console.error(e));
接收传递给回调的数组参数。
someFunction((a, b, c) => {
  console.log(a); // 5
  console.log(b); // 'some string'
  console.log(c); // {someProperty: 'someValue'}
});

const boundSomeFunction = Rx.Observable.bindCallback(someFunction);
boundSomeFunction().subscribe(values => {
  console.log(values) // [5, 'some string', {someProperty: 'someValue'}]
});
使用带 selector 函数的 bindCallback。
someFunction((a, b, c) => {
  console.log(a); // 'a'
  console.log(b); // 'b'
  console.log(c); // 'c'
});

const boundSomeFunction = Rx.Observable.bindCallback(someFunction, (a, b, c) => a + b + c);
boundSomeFunction().subscribe(value => {
  console.log(value) // 'abc'
});
对使用和不使用 async 调度器的行为进行比较。
function iCallMyCallbackSynchronously(cb) {
  cb();
}

const boundSyncFn = Rx.Observable.bindCallback(iCallMyCallbackSynchronously);
const boundAsyncFn = Rx.Observable.bindCallback(iCallMyCallbackSynchronously, null, Rx.Scheduler.async);

boundSyncFn().subscribe(() => console.log('I was sync!'));
boundAsyncFn().subscribe(() => console.log('I was async!'));
console.log('This happened...');

// Logs:
// I was sync!
// This happened...
// I was async!
在对象方法上使用 bindCallback
const boundMethod = Rx.Observable.bindCallback(someObject.methodWithCallback);
boundMethod.call(someObject) // 确保methodWithCallback可以访问someObject
.subscribe(subscriber);

Test:

See:

public static bindNodeCallback(func: function, selector: function, scheduler: Scheduler): * source

把 Node.js 式回调API转换为返回 Observable 的函数。

就像是 bindCallback, 但是回调函数必须形如 callback(error, result)这样

bindNodeCallback 并不是一个操作符,因为它的输入和输出并不是 Observable。输入的是一个 带有多个参数的函数,并且该函数的最后一个参数必须是个回调函数,当该函数执行完之后会掉 用回调函数,回调函数被要求遵循 Node.js 公约,其中第一个参数是错误对象,标示调用是否成功。 如果这个错误对象被传递给了回调函数,这意味着调用出现了错误。

bindNodeCallback 的输出是一个函数,该函数接受的参数和输入函数一样(除了没有最后一个回调函 数)。当输出函数被调用,会返回一个 Observable 。如果输入函数带着错误对象调用回调函数,Observable 也会用这个错误对象触发错误状态。如果错误对象没有被传递,Observable 会发出第二个参数。 如果输入函数给回调函数传递三个或者更多的值,该 Observable 会发出一个包含除了第一个错误参 数的所有值的数组。

bindNodeCallback接受可选的选择器函数,它允许 Observable 发出由选择器计算的值,而 不是普通的回调参数。这和bindCallback的选择器效果类似,但是 node 式的错误参数永远 不会传递给该函数。

注意,输入函数永远不会被调用直到输出函数返回的 Observable 被订阅。默认情况下,订阅后会同步调用 输入方法, 但是这可以被改变,通过使用Scheduler作为可选的第三个参数。 调度器可以控制 Observable 何时发出数据。想要获取更多信息,请查看bindCallback的文档, 工作原理完全一样。

bindCallback一样,输入函数的上下文(this)将会被设置给输出函数的上下文,当它被调用 的时候。当 Observable 发出了数据后,它会立马完成。这意味着即使输入函数再次调用回调函数,第二次以 及后续调用的值永远不会出现在流中。如果你需要处理多次调用,查看fromEvent或者fromEventPattern 来替代。

注意,bindNodeCallback同样可以用在非 Node.js 环境中,Node.js 式回调函数仅仅是一种公约,所以 如果你的目标环境是浏览器或者其他,并且你使用的API遵守了这种回调公约,bindNodeCallback就可以 安全的使用那些API函数。

牢记,传递给回调的错误对象并不是 JavaScript 内置的 Error 的实例。事实上,它甚至可以不是对象。 回调函数的错误参数被解读为“存在”,当该参数有值的时候。它可以是,例如,非0数字,非空字符串,逻辑 是。在所有这些情况下,都会触发 Observable 的错误状态。这意味着当使用bindNodeCallback 的时候通常形式的回调函数都会触发失败。如果你的 Observable 经常发生你预料之外的错误,请检查下 回调函数是否是 node.js 式的回调,如果不是,请使用bindCallback替代。

注意,即使错误参数出现在回调函数中,但是它的值是假值,它仍然不会出现在Observable的发出数组或者选择器中。

Params:

NameTypeAttributeDescription
func function

最后一个参数是 node.js 式回调的函数。

selector function
  • optional

选择器,从回调函数中获取参数并将这些映射为一个 Observable 发出的值。

scheduler Scheduler
  • optional

调度器,调度回调函数。

Return:

*

{function(...params: *): 一个返回 Observable 的函数,该 Observable 发出 node.js 式回调函数返回的数据。

Example:

从文件系统中读取文件并且从 Observable 中获取数据。
import * as fs from 'fs';
var readFileAsObservable = Rx.Observable.bindNodeCallback(fs.readFile);
var result = readFileAsObservable('./roadNames.txt', 'utf8');
result.subscribe(x => console.log(x), e => console.error(e));
使用具有多个参数的函数调用回调
someFunction((err, a, b) => {
  console.log(err); // null
  console.log(a); // 5
  console.log(b); // "some string"
});
var boundSomeFunction = Rx.Observable.bindNodeCallback(someFunction);
boundSomeFunction()
.subscribe(value => {
  console.log(value); // [5, "some string"]
});
使用选择器函数
someFunction((err, a, b) => {
  console.log(err); // undefined
  console.log(a); // "abc"
  console.log(b); // "DEF"
});
var boundSomeFunction = Rx.Observable.bindNodeCallback(someFunction, (a, b) => a + b);
boundSomeFunction()
.subscribe(value => {
  console.log(value); // "abcDEF"
});
非 node.js 式的回调函数
someFunction(a => {
  console.log(a); // 5
});
var boundSomeFunction = Rx.Observable.bindNodeCallback(someFunction);
boundSomeFunction()
.subscribe(
  value => {}             // never gets called
  err => console.log(err) // 5
);

Test:

See:

public static combineLatest(observable1: ObservableInput, observable2: ObservableInput, project: function, scheduler: Scheduler): Observable source

组合多个 Observables 来创建一个 Observable ,该 Observable 的值根据每个输入 Observable 的最新值计算得出的。

它将使用所有输入中的最新值计算公式,然后发出该公式的输出。

combineLatest 结合所有输入 Observable 参数的值. 顺序订阅每个 Observable, 每当任一输入 Observable 发出,收集每个输入 Observable 的最新值组成一个数组。所以,当你给操作符 传入 n 个 Observable,返回的 Observable 总是会发出一个长度为 n 的数组,对应输入 Observable 的顺序(第一个 Observable 的值放到数组的第一个)。

静态版本的 combineLatest 接受一个 Observables 数组或者每个 Observable 可以直接作为参数。 请注意,如果你事先不知道你将要结合多少个 Observable, 那么 Observables 数组是一个好的选择。 传递空的数组将会导致返回 Observable 立马完成。

为了保证输出数组的长度相同,combineLatest 实际上会等待所有的输入 Observable 至少发出一次, 在返回 Observable 发出之前。这意味着如果某个输入 Observable 在其余的输入 Observable 之前发出,它所发出 的值只保留最新的。另一方面,如果某个输入 Observable 没有发出值就完成了,返回 Observable 也不会发 射值并立马完成,因为不可能从已经完成的 Observable 中收集到值。同样的,如果某个输入 Observable 不发出值也不完成,combineLatest会永远不发出值也不结束。所以,再次强调下,它会等待所有的流 去发出值。

如果给combineLatest至少传递一个输入 Observable 并且所有传入的输入 Observable 都发出了值,返回 Observable 将会在所有结合流完成后完成。所以即使某些 Observable 完成了,当其他输入 Observable 发出值的时候,combineLatest返回 Observable 仍然会发出值。对于完成的输入 Observable,它 的值一直是最后发出的值。另一方面,如果任一输入 Observable 发生错误,combineLatest也会 立马触发错误状态,所有的其他输入 Observable 都会被解除订阅。

combineLatest接受一个可选的参数投射函数,它接受返回 Observable 发出的值。投射函数 可以返回任何数据,这些数据代替默认的数组被返回 Observable 发出。需要注意的是,投射函数并不接 受值的数组,而是值本身。这意味着默认的投射函数就是一个接受所有参数并把它们放到一个数组里面的 函数。

Params:

NameTypeAttributeDescription
observable1 ObservableInput

用来和其他 Observables 进行结合的输入 Observable 。

observable2 ObservableInput

用来和其他 Observables 进行结合的输入 Observable 。 可以有多个输入Observables传入或者第一个参数是Observables数组

project function
  • optional

投射成输出 Observable 上的一个新的值。

scheduler Scheduler
  • optional
  • default: null

用来订阅每个输入 Observable 的调度器。

Return:

Observable

该 Observable 为每个输入 Observable 的最新值的投射,或者每个输入 Observable 的最新值的数组。

Example:

结合两个 timer Observables
const firstTimer = Rx.Observable.timer(0, 1000); // 从现在开始,每隔1秒发出0, 1, 2...
const secondTimer = Rx.Observable.timer(500, 1000); // 0.5秒后,每隔1秒发出0, 1, 2...
const combinedTimers = Rx.Observable.combineLatest(firstTimer, secondTimer);
combinedTimers.subscribe(value => console.log(value));
// Logs
// [0, 0] after 0.5s
// [1, 0] after 1s
// [1, 1] after 1.5s
// [2, 1] after 2s
结合 Observables 数组
const observables = [1, 5, 10].map(
  n => Rx.Observable.of(n).delay(n * 1000).startWith(0) // 先发出0,然后在 n 秒后发出 n。
);
const combined = Rx.Observable.combineLatest(observables);
combined.subscribe(value => console.log(value));
// 日志
// [0, 0, 0] 立刻
// [1, 0, 0] 1s 后
// [1, 5, 0] 5s 后
// [1, 5, 10] 10s 后
使用 project 函数动态计算体重指数
var weight = Rx.Observable.of(70, 72, 76, 79, 75);
var height = Rx.Observable.of(1.76, 1.77, 1.78);
var bmi = Rx.Observable.combineLatest(weight, height, (w, h) => w / (h * h));
bmi.subscribe(x => console.log('BMI is ' + x));

// 控制台输出:
// BMI is 24.212293388429753
// BMI is 23.93948099205209
// BMI is 23.671253629592222

Test:

See:

public static concat(input1: ObservableInput, input2: ObservableInput, scheduler: Scheduler): Observable source

创建一个输出 Observable,该 Observable 顺序的发出每个输入 Observable 的所有值。

连接多个输入 Observable,顺序的发出它们的值,一个 Observable 接一个 Observable。

concat通过一次订阅一个将多个 Observables 连接起来,并将值合并到输出 Observable 中。 你可以传递一个输入 Observable 数组,或者直接把它们当做参数传递。 传递一个空数组会 导致输出 Observable 立马触发完成状态。

concat会订阅第一个输入 Observable 并且发出它的所有值, 不去做任何干预。 当这个 输入 Observable 完成时, 订阅第二个输入 Observable,同样的发出它的所有值。这个过 程会不断重复直到输入 Observable 都用过了。当最后一个输入 Observable 完成时,concat 也会完成。 任何时刻都只会有一个输入 Observable 发出值。 如果你想让所有的输入 Observable 并行发出数据,请查看merge, 特别的带上concurrent参数。 事实上,concatconcurrent设置为1的merge效果是一样的。

注意,如果输入 Observable 一直都不完成, concat 也会一直不能完成并且下一个输入 Observable 将永远不能被订阅. 另一方面, 如果某个输入 Observable 在它被订阅后立马处于完成状态, 那么它对 concat是不可见的, 仅仅会转向下一个输入 Observable.

如果输入 Observable 链中的任一成员发生错误, concat会立马触发错误状态,而不去控制下一个输入 Observable. 发生错误的输入 Observable 之后的输入 Observable 不会被订阅.

如果你将同一输入 Observable 传递给concat多次,结果流会在每次订阅的时候“重复播放”, 这意味着 你可以重复 Observable 多次. 如果你乏味的给concat传递同一输入 Observable 1000次,你可以试着 用用repeat.

Params:

NameTypeAttributeDescription
input1 ObservableInput

等待被连接的输入 Observable。

input2 ObservableInput

等待被连接的输入 Observable。 可以传递多个输入Observable.

scheduler Scheduler
  • optional
  • default: null

可选的调度器,调度每个 Observable 的订阅。

Return:

Observable

有序的、串行的将所有输入 Observable 的值合并到单一的输出 Observable。

Example:

将从0数到3的定时器和从1到10的同步序列进行连接
var timer = Rx.Observable.interval(1000).take(4);
var sequence = Rx.Observable.range(1, 10);
var result = Rx.Observable.concat(timer, sequence);
result.subscribe(x => console.log(x));

// results in:
// 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3 -immediate-> 1 ... 10
连接3个 Observables
var timer1 = Rx.Observable.interval(1000).take(10);
var timer2 = Rx.Observable.interval(2000).take(6);
var timer3 = Rx.Observable.interval(500).take(10);
var result = Rx.Observable.concat([timer1, timer2, timer3]); // note that array is passed
result.subscribe(x => console.log(x));

// results in the following:
// (Prints to console sequentially)
// -1000ms-> 0 -1000ms-> 1 -1000ms-> ... 9
// -2000ms-> 0 -2000ms-> 1 -2000ms-> ... 5
// -500ms-> 0 -500ms-> 1 -500ms-> ... 9
连接同一个 Observable 多次
const timer = Rx.Observable.interval(1000).take(2);

Rx.Observable.concat(timer, timer) // concating the same Observable!
.subscribe(
  value => console.log(value),
  err => {},
  () => console.log('...and it is done!')
);

// Logs:
// 0 after 1s
// 1 after 2s
// 0 after 3s
// 1 after 4s
// "...and it is done!" also after 4s

See:

public static create(onSubscription: function(observer: Observer): TeardownLogic): Observable source

创建一个新的 Observable ,当观察者( Observer )订阅该 Observable 时,它会执行指定的函数。

创建自定义的 Observable ,它可以做任何你想做的事情

createonSubscription 函数转化为一个实际的 Observable 。每当有人订阅该 Observable 的 时候,onSubscription函数会接收 Observer 实例作为唯一参数执行。onSubscription 应该 调用观察者对象的 next, errorcomplete 方法。

带值调用 next 会将该值发出给观察者。调用 complete 意味着该 Observable 结束了发出并且不会做任何事情了。 调用 error 意味着出现了错误,传给 error 的参数应该提供详细的错误信息。

一个格式良好的 Observable 可以通过 next 方法发出任意多个值,但是 completeerror 方法只能被调用 一次,并且调用之后不会再调用任何方法。 如果你试图在 Observable 已经完成或者发生错误之后调用nextcompleteerror 方法,这些调用将会被忽略,以保护所谓的 Observable 合同。注意,你并不需要一定要在某个时刻 调用 complete 方法,创建一个不会被终止的 Observable 也是完全可以的,一切取决于你的需求。

onSubscription 可以选择性的返回一个函数或者一个拥有 unsubscribe 方法的对象。 当要取消对 Observable 的订阅时,函数或者方法将会被调用,清理所有的资源。比如说,如果你在自己的 Observable 里面使用了 setTimeout, 当有人要取消订阅的时候, 你可以清理定时器, 这样就可以减少不必要的触发,并且浏览 器(或者其他宿主环境)也不用将计算能力浪费在这种无人监听的定时事件上。

绝大多数情况下你不需要使用 create,因为现有的操作符创建出来的 Observable 能满足绝大多数使用场景。这也就意味着, create 是允许你创建任何 Observable 的底层机制,如果你有非常特殊的需求的话,可以使用它。

TypeScript 签名问题

因为 Observable 继承的类已经定义了静态 create 方法,但是签名不同, 不可能给 Observable.create 合适的签名。 正因为如此,给 create 传递的函数将不会进行类型检查,除非你明确指定了特定的签名。

当使用 TypeScript 时,我们建议将传递给 create 的函数签名声明为(observer: Observer) => TeardownLogic, 其中ObserverTeardownLogic 是库提供的接口。

Params:

NameTypeAttributeDescription
onSubscription function(observer: Observer): TeardownLogic

该函数接受一个观察者, 然后在适当的时机调用观察者的 nexterror 或者 complete 方法,也可以返回一些清理资源的逻辑。

Return:

Observable

Observable, 当该 Observable 被订阅的时候将会执行特定函数。

Example:

发出三个数字,然后完成。
var observable = Rx.Observable.create(function (observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
});
observable.subscribe(
  value => console.log(value),
  err => {},
  () => console.log('this is the end')
);

// 日志:
// 1
// 2
// 3
// "this is the end"
发出一个错误
const observable = Rx.Observable.create((observer) => {
  observer.error('something went really wrong...');
});

observable.subscribe(
  value => console.log(value), // 永远不会被调用
  err => console.log(err),
  () => console.log('complete') // 永远不会被调用
);

// 日志:
// "something went really wrong..."
返回取消订阅函数

const observable = Rx.Observable.create(observer => {
  const id = setTimeout(() => observer.next('...'), 5000); // 5s后发出数据

  return () => { clearTimeout(id); console.log('cleared!'); };
});

const subscription = observable.subscribe(value => console.log(value));

setTimeout(() => subscription.unsubscribe(), 3000); // 3s后取消订阅

// 日志:
// "cleared!" after 3s

// Never logs "..."

See:

public static defer(observableFactory: function(): SubscribableOrPromise): Observable source

创建一个 Observable,当被订阅的时候,调用 Observable 工厂为每个观察者创建新的 Observable。

惰性创建 Observable, 也就是说, 当且仅当它被订阅的时候才创建。

defer允许你创建一个 Observable 当且仅当它被订阅的时候,并且为每个订阅者创建新的 Observable。 它一直在等待直到观察者订阅了它, 然后它创建一个新的 Observable,通常会以 Observable 工厂函数的方式。 对每个订阅者它都是新的, 所以即使每个订阅者也许会认为它们订阅的是同一个 Observable, 事实上每个订阅 者获得的是只属于它们的 Observable。

Params:

NameTypeAttributeDescription
observableFactory function(): SubscribableOrPromise

Observable 的工 厂函数,它会在每个 Observer 订阅 Observable 的时候被触发调用. 也可以返回一个 Promise, Promise 将会立刻被转 化为 Observable。

Return:

Observable

Observable,该 Observable 的观察者的订阅会触发对 Observable 工厂函数的调用。

Example:

随机订阅点击或者 interval Observable
var clicksOrInterval = Rx.Observable.defer(function () {
  if (Math.random() > 0.5) {
    return Rx.Observable.fromEvent(document, 'click');
  } else {
    return Rx.Observable.interval(1000);
  }
});
clicksOrInterval.subscribe(x => console.log(x));

// 结果如下:
// 如果Math.random()返回的值大于0.5,它会监听"document"上的点击事件; 当document
// 被点击,它会将点击事件对象打印到控制台。 如果结果小于0.5它会每秒发出一个从0开始自增数。

Test:

See:

public static empty(scheduler: Scheduler): Observable source

创建一个什么数据都不发出并且立马完成的 Observable。

仅仅发出 complete 通知,其他什么也不做。

这个静态操作符对于创建一个简单的只发出完成状态通知的 Observable 是非常有用的。 它可以被用来和 其他 Observables 进行组合, 比如在 mergeMap 中使用。

Params:

NameTypeAttributeDescription
scheduler Scheduler
  • optional

调度器 ( IScheduler ), 用来调度完成通知。

Return:

Observable

空的Observable: 仅仅发出完成通知。

Example:

发出数字7, 然后完成。
var result = Rx.Observable.empty().startWith(7);
result.subscribe(x => console.log(x));
仅将奇数映射并打平成字母序列abc。
var interval = Rx.Observable.interval(1000);
var result = interval.mergeMap(x =>
  x % 2 === 1 ? Rx.Observable.of('a', 'b', 'c') : Rx.Observable.empty()
);
result.subscribe(x => console.log(x));

// 结果如下:
// x 是间隔的计数比如:0,1,2,3,...
// x 1000ms 出现一次
// 如果 x % 2 等于 1 打印 abc
// 如果 x % 2 不等于1 什么也不输出

Test:

See:

public static forkJoin(sources: *): any source

Params:

NameTypeAttributeDescription
sources *

Return:

any

Test:

public static from(ish: ObservableInput<T>, scheduler: Scheduler): Observable<T> source

从一个数组、类数组对象、Promise、迭代器对象或者类 Observable 对象创建一个 Observable.

几乎可以把任何东西都能转化为Observable.

将各种其他对象和数据类型转化为 Observables。 from将 Promise、类数组对象、迭代器对象 (https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#iterable) 转化为 Observable ,该 Observable 发出 promise、数组或者迭代器的成员。 字符串,在这种上下文里,会被当做字符数组。 类 Observable 对象(包括ES2015里的 Observable 方法)也可以通过此操作符进行转换。

Params:

NameTypeAttributeDescription
ish ObservableInput<T>

一个可以被订阅的对象, Promise, 类 Observable, 数组, 迭代器或者类数组对象可以被转化。

scheduler Scheduler
  • optional

调度器,用来调度值的发送。

Return:

Observable<T>

Observable的值来自于输入对象的转化。

Example:

将数组转化为 Observable
var array = [10, 20, 30];
var result = Rx.Observable.from(array);
result.subscribe(x => console.log(x));

// 结果如下:
// 10 20 30
将一个无限的迭代器(来自于 generator)转化为 Observable。
function* generateDoubles(seed) {
  var i = seed;
  while (true) {
    yield i;
    i = 2 * i; // double it
  }
}

var iterator = generateDoubles(3);
var result = Rx.Observable.from(iterator).take(10);
result.subscribe(x => console.log(x));

// Results in the following:
// 3 6 12 24 48 96 192 384 768 1536

Test:

See:

public static fromEvent(target: EventTargetLike, eventName: string, options: EventListenerOptions, selector: SelectorMethodSignature<T>): Observable<T> source

创建一个 Observable,该 Observable 发出来自给定事件对象的指定类型事件。

创建一个来自于 DOM 事件,或者 Node 的 EventEmitter 事件或者其他事件的 Observable。

通过给“事件目标”添加事件监听器的方式创建 Observable,可能会是拥有addEventListenerremoveEventListener方法的对象,一个 Node.js 的 EventEmitter,一个 jQuery 式的 EventEmitter, 一个 DOM 的节点集合, 或者 DOM 的 HTMLCollection。 当输出 Observable 被订阅的时候事件处理函数会被添加, 当取消订阅的时候会将事件处理函数移除。

Params:

NameTypeAttributeDescription
target EventTargetLike

DOMElement, 事件目标, Node.js EventEmitter, NodeList 或者 HTMLCollection 等附加事件处理方法的对象。

eventName string

感兴趣的事件名称, 被 target 发出。

options EventListenerOptions
  • optional

可选的传递给 addEventListener 的参数。

selector SelectorMethodSignature<T>
  • optional

可选的函数处理结果. 接收事件处理函数的参数,应该返回单个值。

Return:

Observable<T>

Example:

发出 DOM document 上的点击事件。
var clicks = Rx.Observable.fromEvent(document, 'click');
clicks.subscribe(x => console.log(x));

// 结果:
// 每次点击 document 时,都会在控制台上输出 MouseEvent 。

Test:

See:

public static fromEventPattern(addHandler: function(handler: Function): any, removeHandler: function(handler: Function, signal?: any): void, selector: function(...args: any): T): Observable<T> source

从一个基于 addHandler/removeHandler 方法的API创建 Observable。

将任何 addHandler/removeHandler 的API转化为 Observable。

创建 Observable ,该 Observable 通过使用addHandlerremoveHandler添加和删除事件处理器, 使用可选的选择器函数将事件参数转化为结果. addHandler当输出 Observable 被订阅的时候调用, removeHandler 方法在取消订阅的时候被调用。

Params:

NameTypeAttributeDescription
addHandler function(handler: Function): any

一个接收处理器的函数,并且将 该处理器添加到事件源。

removeHandler function(handler: Function, signal?: any): void
  • optional

可选的 函数,接受处理器函数做为参数,可以移除处理器当之前使用addHandler添加处理器。如果 addHandler 返回的信号当移除的时候要清理,removeHandler 会去做这件事情。

selector function(...args: any): T
  • optional

可选的函数处理结果。 接受事件处理的参数返 回单个的值。

Return:

Observable<T>

Example:

发出 DOM document 上的点击事件
function addClickHandler(handler) {
  document.addEventListener('click', handler);
}

function removeClickHandler(handler) {
  document.removeEventListener('click', handler);
}

var clicks = Rx.Observable.fromEventPattern(
  addClickHandler,
  removeClickHandler
);
clicks.subscribe(x => console.log(x));

Test:

See:

public static fromPromise(promise: PromiseLike<T>, scheduler: Scheduler): Observable<T> source

将 Promise 转化为 Observable。

返回一个仅仅发出 Promise resolve 过的值然后完成的 Observable。

把 ES2015 的 Promise 或者兼容 Promises/A+ 规范的 Promise 转化为 Observable。 如果 Promise resolves 一个值, 输出 Observable 发出这个值然后完成。 如果 Promise 被 rejected, 输出 Observable 会发出相应的 错误。

Params:

NameTypeAttributeDescription
promise PromiseLike<T>

被转化的 promise。

scheduler Scheduler
  • optional

可选的调度器,用来调度 resolved 或者 rejection 的值。

Return:

Observable<T>

包装了 Promise 的 Observable。

Example:

将 Fetch 返回的 Promise 转化为 Observable。
var result = Rx.Observable.fromPromise(fetch('http://myserver.com/'));
result.subscribe(x => console.log(x), e => console.error(e));

Test:

See:

public static interval(period: number, scheduler: Scheduler): Observable source

创建一个 Observable ,该 Observable 使用指定的 IScheduler ,并以指定时间间隔发出连续的数字。

定期发出自增的数字。

interval 返回一个发出无限自增的序列整数, 你可以选择固定的时间间隔进行发送。 第一次并 没有立马去发送, 而是第一个时间段过后才发出。 默认情况下, 这个操作符使用 async 调度器来 提供时间的概念,但也可以给它传递任意调度器。

Params:

NameTypeAttributeDescription
period number
  • optional
  • default: 0

时间间隔,它以毫秒为单位(默认),或者由调度器的内部时钟决定的时间单位。

scheduler Scheduler
  • optional
  • default: async

调度器,用来调度值的发送并提供”时间“的概念。

Return:

Observable

每个时间间隔都发出自增数的 Observable 。

Example:

每1秒发出一个自增数
var numbers = Rx.Observable.interval(1000);
numbers.subscribe(x => console.log(x));

Test:

See:

public static merge(observables: ...ObservableInput, concurrent: number, scheduler: Scheduler): Observable source

创建一个输出 Observable ,它可以同时发出每个给定的输入 Observable 中值。

通过把多个 Observables 的值混合到一个 Observable 中来将其打平。

merge 订阅每个给定的输入 Observable (作为参数),然后只是将所有输入 Observables 的所有值发 送(不进行任何转换)到输出 Observable 。所有的输入 Observable 都完成了,输出 Observable 才 能完成。任何由输入 Observable 发出的错误都会立即在输出 Observalbe 上发出。

Params:

NameTypeAttributeDescription
observables ...ObservableInput

合并到一起的输入Observables。

concurrent number
  • optional
  • default: Number.POSITIVE_INFINITY

可以同时订阅的输入 Observables 的最大数量。

scheduler Scheduler
  • optional
  • default: null

调度器用来管理并行的输入Observables。

Return:

Observable

该 Observable 发出的项是每个输入 Observable 的结果。

Example:

合并两个 Observables: 时间间隔为1秒的 timer 和 clicks
var clicks = Rx.Observable.fromEvent(document, 'click');
var timer = Rx.Observable.interval(1000);
var clicksOrTimer = Rx.Observable.merge(clicks, timer);
clicksOrTimer.subscribe(x => console.log(x));

// 结果如下:
// 每隔1s发出一个自增值到控制台
// document被点击的时候MouseEvents会被打印到控制台
// 因为两个流被合并了,所以你当它们发生的时候你就可以看见.
合并3个Observables, 但是只并行运行2个
var timer1 = Rx.Observable.interval(1000).take(10);
var timer2 = Rx.Observable.interval(2000).take(6);
var timer3 = Rx.Observable.interval(500).take(10);
var concurrent = 2; // the argument
var merged = Rx.Observable.merge(timer1, timer2, timer3, concurrent);
merged.subscribe(x => console.log(x));

// 结果如下:
// - timer1和timer2将会并行运算
// - timer1每隔1s发出值,迭代10次
// - timer2每隔1s发出值,迭代6次
// - timer1达到迭代最大次数,timer2会继续,timer3开始和timer2并行运行
// - 当timer2达到最大迭代次数就停止,timer3将会继续每隔500ms发出数据直到结束

See:

public static never(): Observable source

创建一个不向观察者发出任何项的 Observable 。

从不发出任何项的 Observable 。

这个静态操作符对于创建既不发出数据也不触发错误和完成通知的 Observable。 可以用来测试或 者和其他 Observables进行组合。 注意,由于不会发送完成通知,这个 Observable 的 subscription 不会被自动地清理。Subscriptions 需要手动清理。

Return:

Observable

A "never" Observable:从不发出任何东西的 Observable 。

Example:

发出7, 然后不发出任何值(也不发出完成通知)。
function info() {
  console.log('Will not be called');
}
var result = Rx.Observable.never().startWith(7);
result.subscribe(x => console.log(x), info, info);

Test:

See:

public static of(values: ...T, scheduler: Scheduler): Observable<T> source

创建一个 Observable,它会依次发出由你提供的参数,最后发出完成通知。 发出你提供的参数,然后完成。

这个静态操作符适用于创建简单的 Observable ,该 Observable 只发出给定的参数, 在发送完这些参数后发出完成通知。它可以用来和其他 Observables 组合比如说concat。 默认情况下,它使用null调度器,这意味着next通知是同步发出的, 尽管使用不同的调度器可以决定这些通知何时送到。

Params:

NameTypeAttributeDescription
values ...T

表示 next 发出的值。

scheduler Scheduler
  • optional

用来调度 next 通知发送的调度器( IScheduler )。

Return:

Observable<T>

发出每个给定输入值的 Observable。

Example:

发出 10、20、 30, 然后是 'a'、 'b'、 'c', 紧接着开始每秒发出。
var numbers = Rx.Observable.of(10, 20, 30);
var letters = Rx.Observable.of('a', 'b', 'c');
var interval = Rx.Observable.interval(1000);
var result = numbers.concat(letters).concat(interval);
result.subscribe(x => console.log(x));

Test:

See:

public static range(start: number, count: number, scheduler: Scheduler): Observable source

创建一个 Observable ,它发出指定范围内的数字序列。

发出区间范围内的数字序列。

range 操作符顺序发出一个区间范围内的连续整数, 你可以决定区间的开始和长度。 默认情况下, 不使用 调度器仅仅同步的发送通知, 但是也可以可选的使用可选的调度器来控制发送。

Params:

NameTypeAttributeDescription
start number
  • optional
  • default: 0

序列中的第一个整数值。

count number
  • optional
  • default: 0

要生成序列的长度。

scheduler Scheduler
  • optional

调度器 ( IScheduler ),用来调度通知的发送。

Return:

Observable

该 Observable 发出有限区间范围内的连续整数。

Example:

发出从1到10的数
var numbers = Rx.Observable.range(1, 10);
numbers.subscribe(x => console.log(x));

Test:

See:

public static throw(将具体的: any, scheduler: Scheduler): Observable source

创建一个不发送数据给观察者并且立马发出错误通知的 Observable。

仅仅发出 error 通知,其他什么也不做。

这个静态操作符对于创建简单的只发出错误通知的 Observable 十分有用。 可以被用来和其他 Observables 组合, 比如在 mergeMap 中使用。

Params:

NameTypeAttributeDescription
将具体的 any

Error 传递给错误通知。

scheduler Scheduler
  • optional

调度器IScheduler,用来调度错误通知的发送。

Return:

Observable

错误的 Observable:只使用给定的错误参数发出错误通知。

Example:

先发出数字7,然后发出错误通知。
var result = Rx.Observable.throw(new Error('oops!')).startWith(7);
result.subscribe(x => console.log(x), e => console.error(e));
映射并打平成字母序列abc,但当数字为13时抛出错误。
var interval = Rx.Observable.interval(1000);
var result = interval.mergeMap(x =>
  x === 13 ?
    Rx.Observable.throw('Thirteens are bad') :
    Rx.Observable.of('a', 'b', 'c')
);
result.subscribe(x => console.log(x), e => console.error(e));

Test:

See:

public static timer(initialDelay: number | Date, period: number, scheduler: Scheduler): Observable source

创建一个 Observable,该 Observable 在初始延时(initialDelay)之后开始发送并且在每个时间周期( period)后发出自增的数字。

就像是interval, 但是你可以指定什么时候开始发送。

timer 返回一个发出无限自增数列的 Observable, 具有一定的时间间隔,这个间隔由你来选择。 第一个发送发生在 初始延时之后. 初始延时就像是Date。 默认情况下, 这个操作符使用 async 调度器来提供时间的概念, 但是你也可以传递任何调度器。 如果时间周期没有被指定, 输出 Observable 只发出0。 否则,会发送一个无限数列。

Params:

NameTypeAttributeDescription
initialDelay number | Date

在发出第一个值 0 之前等待的初始延迟时间。

period number
  • optional

连续数字发送之间的时间周期。

scheduler Scheduler
  • optional
  • default: async

调度器,用来调度值的发送, 提供“时间”的概念。

Return:

Observable

该 Observable 在初始时延(initialDelay)后发出0,并且在之后的每个时间周期(period)后发出按自增的数字。

Example:

每隔1秒发出自增的数字,3秒后开始发送。
var numbers = Rx.Observable.timer(3000, 1000);
numbers.subscribe(x => console.log(x));
5秒后发出一个数字
var numbers = Rx.Observable.timer(5000);
numbers.subscribe(x => console.log(x));

Test:

See:

public static webSocket(urlConfigOrSource: string | WebSocketSubjectConfig): WebSocketSubject source

包装浏览器提供的兼容w3c的WebSocket对象.

Params:

NameTypeAttributeDescription
urlConfigOrSource string | WebSocketSubjectConfig

the source of the websocket as an url or a structure defining the websocket object

Example:

包装浏览器的WebSocket

let socket$ = Observable.webSocket('ws://localhost:8081');

socket$.subscribe(
   (msg) => console.log('message received: ' + msg),
   (err) => console.log(err),
   () => console.log('complete')
 );

socket$.next(JSON.stringify({ op: 'hello' }));
包装nodejs的WebSocket

import { w3cwebsocket } from 'websocket';

let socket$ = Observable.webSocket({
  url: 'ws://localhost:8081',
  WebSocketCtor: w3cwebsocket
});

socket$.subscribe(
   (msg) => console.log('message received: ' + msg),
   (err) => console.log(err),
   () => console.log('complete')
 );

socket$.next(JSON.stringify({ op: 'hello' }));

Test:

public static zip(observables: *): Observable<R> source

将多个 Observable 组合以创建一个 Observable,该 Observable 的值是由所有输入 Observables 的值按顺序计算而来的。

如果最后一个参数是函数, 这个函数被用来计算最终发出的值.否则, 返回一个顺序包含所有输入值的数组.

Params:

NameTypeAttributeDescription
observables *

Return:

Observable<R>

Example:

从不同的源头结合年龄和名称

let age$ = Observable.of<number>(27, 25, 29);
let name$ = Observable.of<string>('Foo', 'Bar', 'Beer');
let isDev$ = Observable.of<boolean>(true, true, false);

Observable
    .zip(age$,
         name$,
         isDev$,
         (age: number, name: string, isDev: boolean) => ({ age, name, isDev }))
    .subscribe(x => console.log(x));

// 输出:
// { age: 27, name: 'Foo', isDev: true }
// { age: 25, name: 'Bar', isDev: true }
// { age: 29, name: 'Beer', isDev: false }

Test:

Public Constructors

public constructor(subscribe: Function) source

Params:

NameTypeAttributeDescription
subscribe Function

当 Observable 初始订阅的时候会调用该方法. 该函数接受 Subscriber, 这样就可以 next 值,或者 error 方法会被调用以引发错误,或者 complete 被调用以通知成功的完成。

Public Methods

public [Symbol_observable](): Observable source

An interop point defined by the es7-observable spec https://github.com/zenparsing/es-observable

Return:

Observable

this instance of the observable

public audit(durationSelector: function(value: T): SubscribableOrPromise): Observable<T> source

在另一个 Observable 决定的时间段里忽略源数据,然后发出源 Observable 中最新发出的值, 然后重复此过程。

就像是auditTime, 但是沉默持续时间段由第二个 Observable 决定。

auditthrottle 很像, 但是发出沉默时间窗口的最后一个值, 而不是第一个。只要 audit 的内部时间器被禁用, 它就会在输出 Observable 上发出源 Observable 的最新值,并且当时间器启用时忽略源值。初始时,时间器是禁用的。 只要第一个源值到达,时间器是用源值调用 durationselector 方法启用,返回 "duration" Observable。 当 duration Observable 发出数据或者完成时,时间器禁用,然后输出 Observable 发出最新的源值,并且不断的重复这个过程。

Params:

NameTypeAttributeDescription
durationSelector function(value: T): SubscribableOrPromise

该函数从源 Observable 中接收值,用于为每个源值计算沉默持续时间,并返回 Observable 或 Promise 。

Return:

Observable<T>

该 Observable 限制源 Observable 的发送频率。

Example:

以每秒最多点击一次的频率发出点击事件
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.audit(ev => Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));

Test:

See:

public auditTime(duration: number, scheduler: Scheduler): Observable<T> source

duration 毫秒内忽略源值,然后发出源 Observable 的最新值, 并且重复此过程。

当它看见一个源值,它会在接下来的 duration 毫秒内忽略这个值以及接下来的源值,过后发出最新的源值。

auditTimethrottleTime 很像, 但是发送沉默时间窗口的最后一个值, 而不是第一个。只要 audit 的内部时间器被禁用,它就会在 输出 Observable 上发出源 Observable 的最新值,并且当定时器启用时忽略源值。初始时,时间器是禁用的。只要第一个值到达, 时间器被启用。度过 持续时间后(或者时间单位由内部可选的参数调度器决定),时间间隔被禁用, 输出 Observable 发出最新的值, 不断的重复这个过程。可选项 IScheduler 用来管理时间器。

Params:

NameTypeAttributeDescription
duration number

以毫秒为单位或以可选的 scheduler 内部决定的时间单位来衡量。

scheduler Scheduler
  • optional
  • default: async

调度器( IScheduler ),用来管理处理限制发送频率的定时器。

Return:

Observable<T>

该 Observable 限制源 Observable 的发送频率。

Example:

以每秒最多点击一次的频率发出点击事件
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.auditTime(1000);
result.subscribe(x => console.log(x));

Test:

See:

public buffer(closingNotifier: Observable<any>): Observable<T[]> source

缓冲源 Observable 的值直到 closingNotifier 发出。

将过往的值收集到一个数组中,并且仅当另一个 Observable 发出通知时才发出此数组。

将 Observable 发出的值缓冲起来直到 closingNotifier 发出数据, 在这个时候在输出 Observable 上发出该缓冲区的值并且内部开启一个新的缓冲区, 等待下一个closingNotifier的发送。

Params:

NameTypeAttributeDescription
closingNotifier Observable<any>

该 Observable 向输出 Observale 发出信号以通知发出缓冲区。

Return:

Observable<T[]>

数组缓冲的 Observable。

Example:

每次点击发出 interval Observable 最新缓冲的数组。
var clicks = Rx.Observable.fromEvent(document, 'click');
var interval = Rx.Observable.interval(1000);
var buffered = interval.buffer(clicks);
buffered.subscribe(x => console.log(x));

Test:

See:

public bufferCount(bufferSize: number, startBufferEvery: number): Observable<T[]> source

缓冲源 Observable 的值直到缓冲数量到达设定的 bufferSize.

将过往的值收集到一个数组中,当数组数量到达设定的 bufferSize 时发出该数组。

缓冲源 Observable 的N个值(N = bufferSize),然后发出该缓冲区并进行清理,再然后开启一个新的缓存区,新缓存区会新缓存M个值(M = startBufferEvery)。 如果startBufferEvery没有提供或者为null, 新的缓冲会在源开始的时候开启并且在每次发出的时候关闭。

Params:

NameTypeAttributeDescription
bufferSize number

缓存区的最大长度。

startBufferEvery number
  • optional

确定何时启用新的缓冲区。 例如上面图中所示,如果startBufferEvery2, 那么隔一个数据会开一个新 的缓冲区。 默认情况下,将在源的起始处启用新的缓冲区。

Return:

Observable<T[]>

缓存值数组的 Observable 。

Example:

将最后两次点击事件作为数组发出
var clicks = Rx.Observable.fromEvent(document, 'click');
var buffered = clicks.bufferCount(2);
buffered.subscribe(x => console.log(x));
在每次点击的时候, 以数组的形式发出最后两次点击
var clicks = Rx.Observable.fromEvent(document, 'click');
var buffered = clicks.bufferCount(2, 1);
buffered.subscribe(x => console.log(x));

Test:

See:

public bufferTime(bufferTimeSpan: number, bufferCreationInterval: number, maxBufferSize: number, scheduler: Scheduler): Observable<T[]> source

在特定时间周期内缓冲源 Observable 的值。

将过往的值收集到数组中,并周期性地发出这些数组。

在一个特定的持续时间bufferTimeSpan内缓存源 Observable 的值。 除非指定了可选参数bufferCreationInterval , 它会发出数组并且重置缓冲区每个bufferTimeSpan毫秒。这个操作符会在每个 bufferCreationInterval 毫秒时开启缓冲区, 并在每个 bufferTimeSpan 毫秒时关闭(发出并重置)缓冲区。如果可选参数maxBufferSize被指定, 缓冲区会在bufferTimeSpan毫秒 之后或者缓冲区元素个数达到maxBufferSize时发出。

Params:

NameTypeAttributeDescription
bufferTimeSpan number

填满每个缓冲数组的时间。

bufferCreationInterval number
  • optional

开启新缓冲区的时间间隔。

maxBufferSize number
  • optional

缓冲区的最大容量。

scheduler Scheduler
  • optional
  • default: async

调度器,调度缓冲区。

Return:

Observable<T[]>

值为缓冲数组的 observable。

Example:

每一秒都发出最新点击事件的数组
var clicks = Rx.Observable.fromEvent(document, 'click');
var buffered = clicks.bufferTime(1000);
buffered.subscribe(x => console.log(x));
每5秒钟,发出接下来2秒内的点击事件(译者注:后3秒内的点击会被忽略)
var clicks = Rx.Observable.fromEvent(document, 'click');
var buffered = clicks.bufferTime(2000, 5000);
buffered.subscribe(x => console.log(x));

Test:

See:

public bufferToggle(openings: SubscribableOrPromise<O>, closingSelector: function(value: O): SubscribableOrPromise): Observable<T[]> source

缓冲源 Observable 的值,openings 发送的时候开始缓冲,closingSelector 发送的时候结束缓冲。

将过往数据收集到数组中. 当opening发送的时候开始收集, 然后调用closingSelector 函数获取 Observable ,该Observable 告知什么时候关闭缓冲。

缓冲源Observable的值,当openingsObservable发出信号的时候开始缓冲数据, 当closingSelector返回的Subscribable 或者Promise发送的时候结束并且发送缓冲区.

Params:

NameTypeAttributeDescription
openings SubscribableOrPromise<O>

开启新缓冲区的通知,可以是 Subscribable 或 Promise 。

closingSelector function(value: O): SubscribableOrPromise

接受openingsobservable 发出的数据返回一个可以被订阅的对象或者Promise的函数,当它发出时,会发信号给相关的缓冲区以通知它们应该发出并清理。

Return:

Observable<T[]>

缓冲数组的 observable。

Example:

每隔一秒钟,发出接下来500毫秒内的点击事件。
var clicks = Rx.Observable.fromEvent(document, 'click');
var openings = Rx.Observable.interval(1000);
var buffered = clicks.bufferToggle(openings, i =>
  i % 2 ? Rx.Observable.interval(500) : Rx.Observable.empty()
);
buffered.subscribe(x => console.log(x));

Test:

See:

public bufferWhen(closingSelector: function(): Observable): Observable<T[]> source

缓冲源 Observable 的值, 使用关闭 Observable 的工厂函数来决定何时关闭、发出和重置缓冲区。

将过往的值收集到数组中, 当开始收集数据的时候, 调用函数返回 Observable, 该 Observable 告知何时关闭缓冲区并重新开始收集。

立马开启缓冲区, 然后当closingSelector函数返回的observable发出数据的时候关闭缓冲区. 当关闭缓冲区的时候, 会立马开启新的缓冲区,并不断重复此过程。

Params:

NameTypeAttributeDescription
closingSelector function(): Observable

该函数不接受参数,并返回通知缓冲区关闭的 Observable 。

Return:

Observable<T[]>

缓冲数组的 Observable 。

Example:

发出每个随机秒(1-5秒)数内的最新点击事件数组。
var clicks = Rx.Observable.fromEvent(document, 'click');
var buffered = clicks.bufferWhen(() =>
  Rx.Observable.interval(1000 + Math.random() * 4000)
);
buffered.subscribe(x => console.log(x));

Test:

See:

public catch(selector: function): Observable source

捕获 observable 中的错误,可以通过返回一个新的 observable 或者抛出错误对象来处理。

Params:

NameTypeAttributeDescription
selector function

该函数接受 err 参数,即错误对象,还接受 caught 参数,即源 Observable, 当你想“重试”的时候返回它即可。 无论 selector 函数返回的 observable 是什么,都会被用来继续执行 observable 链。

Return:

Observable

该 Observable 源自源 Observable 或 selector 函数返回的 Observable。

Example:

当发生错误的时候通过返回一个新的 Observable 继续运行

Observable.of(1, 2, 3, 4, 5)
  .map(n => {
	   if (n == 4) {
	     throw 'four!';
    }
   return n;
  })
  .catch(err => Observable.of('I', 'II', 'III', 'IV', 'V'))
  .subscribe(x => console.log(x));
  // 1, 2, 3, I, II, III, IV, V
当发生错误的时候重试源 Observable, 和retry()操作符类似

Observable.of(1, 2, 3, 4, 5)
  .map(n => {
	   if (n === 4) {
	     throw 'four!';
    }
	   return n;
  })
  .catch((err, caught) => caught)
  .take(30)
  .subscribe(x => console.log(x));
  // 1, 2, 3, 1, 2, 3, ...
当源 Observable 发生错误的时候,抛出一个新的错误

Observable.of(1, 2, 3, 4, 5)
  .map(n => {
    if (n == 4) {
      throw 'four!';
    }
    return n;
  })
  .catch(err => {
    throw 'error in source. Details: ' + err;
  })
  .subscribe(
    x => console.log(x),
    err => console.log(err)
  );
  // 1, 2, 3, error in source. Details: four!

Test:

public combineAll(project: function): Observable source

通过等待外部 Observable 完成然后应用 combineLatest ,将高阶 Observable 转化为一阶 Observable。

当高阶 Observable 完成时,通过使用 combineLatest 将其打平。

接受一个返回 Observables 的 Observable, 并从中收集所有的 Observables 。 一旦最外部的 Observable 完成, 会订阅所有收集的 Observables 然后通过combineLatest合并值, 这样:

  • 每次内部 Observable 发出的时候, 外部 Observable 也发出。
  • 当返回的 observable 发出的时候, 它会通过如下方式发出所有最新的值:
    • 如果提供了`project`函数, 该函数会按内部 Observable 到达的顺序依次使用每个内部 Observable 的最新值进行调用。
    • 如果没有提供`project`函数, 包含所有最新数据的数组会被输出 Observable 发出。

Params:

NameTypeAttributeDescription
project function
  • optional

它按顺序的从每个收集到的内部 Observable 中接收最新值作为参数。

Return:

Observable

该 Observable 为最新值的投射结果或数组。

Example:

将两个点击事件映射为有限的 interval Observable,然后应用 combineAll
var clicks = Rx.Observable.fromEvent(document, 'click');
var higherOrder = clicks.map(ev =>
  Rx.Observable.interval(Math.random()*2000).take(3)
).take(2);
var result = higherOrder.combineAll();
result.subscribe(x => console.log(x));

Test:

See:

public combineLatest(other: ObservableInput, project: function): Observable source

组合多个 Observables 来创建一个 Observable ,该 Observable 的值根据每个输入 Observable 的最新值计算得出的。

它将使用所有输入中的最新值计算公式,然后发出该公式的输出。

combineLatest 结合传入的多个 Observables。 通过顺序的订阅每个输入Observable, 在每次任一输入Observables发送的时候收集 每个输入Observables最新的值组成一个数组, 然后要么将这个数组传给可选的投射函数并发送投射函数返回的结果, 或者在没有提供投射函数时仅仅发出该数组。

Params:

NameTypeAttributeDescription
other ObservableInput

将要和源 Observable 结合的输入 Observable。 可以传入多个输入 Observables。

project function
  • optional

可选的投射函数,将输出 Observable 返回的值投射为要发出的新的值。

Return:

Observable

该 Observable 为每个输入 Observable 的最新值的投射结果或数组。

Example:

根据一个身高的 Observable 和一个体重的 Observable 动态的计算BMI指数
var weight = Rx.Observable.of(70, 72, 76, 79, 75);
var height = Rx.Observable.of(1.76, 1.77, 1.78);
var bmi = weight.combineLatest(height, (w, h) => w / (h * h));
bmi.subscribe(x => console.log('BMI is ' + x));

// With output to console:
// BMI is 24.212293388429753
// BMI is 23.93948099205209
// BMI is 23.671253629592222

See:

public concat(other: ObservableInput, scheduler: Scheduler): Observable source

创建一个输出 Observable,它在当前 Observable 之后顺序地发出每个给定的输入 Observable 中的所有值。

通过顺序地发出多个 Observables 的值将它们连接起来,一个接一个的。

通过依次订阅输入Observable将输出Observable加入多个输入Observable,从源头开始, 合并它们的值给输出Observable. 只有前一个Observable结束才会进行下一个Observable。

Params:

NameTypeAttributeDescription
other ObservableInput

等待被连接的 Observable。 可以接受多个输入 Observable。

scheduler Scheduler
  • optional
  • default: null

可选的调度器,控制每个输入 Observable 的订阅。

Return:

Observable

顺序的、串行的将所有输入 Observable 的值合并给输出 Observable。

Example:

将从0数到3的定时器和从1到10的同步序列进行连接
var timer = Rx.Observable.interval(1000).take(4);
var sequence = Rx.Observable.range(1, 10);
var result = timer.concat(sequence);
result.subscribe(x => console.log(x));

// results in:
// 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3 -immediate-> 1 ... 10
连接3个Observables
var timer1 = Rx.Observable.interval(1000).take(10);
var timer2 = Rx.Observable.interval(2000).take(6);
var timer3 = Rx.Observable.interval(500).take(10);
var result = timer1.concat(timer2, timer3);
result.subscribe(x => console.log(x));

// results in the following:
// (Prints to console sequentially)
// -1000ms-> 0 -1000ms-> 1 -1000ms-> ... 9
// -2000ms-> 0 -2000ms-> 1 -2000ms-> ... 5
// -500ms-> 0 -500ms-> 1 -500ms-> ... 9

Test:

See:

public concatAll(): Observable source

通过顺序地连接内部 Observable,将高阶 Observable 转化为一阶 Observable 。

通过一个接一个的连接内部 Observable ,将高阶 Observable 打平。

串行连接源(高阶 Observable)所发出的每个 Observable,只有当一个内部 Observable 完成的时候才订阅下 一个内部 Observable,并将它们的所有值合并到返回的 Observable 中。

警告: 如果源 Observable 很快并且不停的发送 Observables, 内部 Observables 发送的完成 通知比源 Observable 慢, 你会遇到内存问题,因为传入的 Observables 在无界缓冲区中收集.

注意: concatAll 等价于 concurrency 参数(最大并发数)为1的 mergeAll 。

Return:

Observable

Observable,该 Observable 串联地发出所有内部 Observables 的值。

Example:

每次点击都会触发从0到3的定时器(时间间隔为1秒),定时器之间是串行的
var clicks = Rx.Observable.fromEvent(document, 'click');
var higherOrder = clicks.map(ev => Rx.Observable.interval(1000).take(4));
var firstOrder = higherOrder.concatAll();
firstOrder.subscribe(x => console.log(x));

// 结果如下:
// (结果是串行的)
// 对于"document"对象上的点击事件,都会以1秒的间隔发出从0到3的值
// one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3

Test:

See:

public concatMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable source

将源值投射为一个合并到输出 Observable 的 Observable,以串行的方式等待前一个完成再合并下一个 Observable。

将每个值映射为 Observable, 然后使用concatAll将所有的 内部 Observables 打平。

返回一个 Observable,该 Observable 发出基于对源 Observable 发出的值调用提供的函数, 该函数返回所谓的内部 Observable。 每个新的内部 Observable 和前一个内部 Observable 连接在一起。

警告: 如果源值不断的到达并且速度快于内部 Observables 完成的速度, 它会导致内存问题, 因为内部的 Observable 在无限制的缓冲区中聚集,以等待轮流订阅。

Note: `concatMap` 等价于 `concurrency` 参数(最大并发数)为1的 `mergeMap` 。

Params:

NameTypeAttributeDescription
project function(value: T, ?index: number): ObservableInput

用在源Observable发出的每个值上,返回Observable.

resultSelector function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any
  • optional

函数,它用于产生基于值的输出 Observable 和源(外部)发送和内部 Observable 发送的索引。 传递给这个函数参数有:

  • outerValue: 来自源的值
  • innerValue: 来自投射的 Observable 的值
  • outerIndex: 来自源的值的 "index"
  • innerIndex: 来自投射的 Observable 的值的 "index"

Return:

Observable

Observable,发出对源Observable发出的每个值使用投射函数 (和可选的resultSelector)的结果并且顺序的取出每个投射过的内部Observable的值.

Example:

每次点击都会触发从0到3的定时器(时间间隔为1秒),定时器之间是串行的
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.concatMap(ev => Rx.Observable.interval(1000).take(4));
result.subscribe(x => console.log(x));

// 结果如下:
// (结果是串行的)
// 对于"document"对象上的点击事件,都会以1秒的间隔发出从0到3的值
// one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3

Test:

See:

public concatMapTo(innerObservable: ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable source

将每个源值投射成同一个 Observable ,该 Observable 会以串行的方式多次合并到输出 Observable 中 。

就像是concatMap, 但是将每个值总是映射为同一个内部 Observable。

不管源值是多少都将其映射为给定的innerObservable, 然后将其打平为单个 Observable,也就 是所谓的输出 Observable。 在输出 Observable 上发出的每个新的 innerObservable 实例与 先前的 innerObservable 实例相连接。

警告: 如果源值不断的到达并且速度快于内部Observables完成的速度, 它会导致内存问题 因为内部的 Observable 在无限制的缓冲区中聚集,以等待轮流订阅。

Note: concatMapTo is equivalent to mergeMapTo with concurrency parameter set to 1.

Params:

NameTypeAttributeDescription
innerObservable ObservableInput

Observable,替换源Observable的每个值.

resultSelector function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any
  • optional

函数,它用于产生基于值的输出 Observable 和源(外部)发送和内部 Observable 发送的索引。 传递给这个函数参数有:

  • outerValue: 来自源的值
  • innerValue: 来自投射的 Observable 的值
  • outerIndex: 来自源的值的 "index"
  • innerIndex: 来自投射的 Observable 的值的 "index"

Return:

Observable

observable,值由将每个源值投射的observable串行合并而成.

Example:

每次点击都会触发从0到3的定时器(时间间隔为1秒),定时器之间是串行的
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.concatMapTo(Rx.Observable.interval(1000).take(4));
result.subscribe(x => console.log(x));

// 结果如下:
// (结果不是并行的)
// 对于"document"对象上的点击事件,都会以1秒的间隔发出从0到3的值
// one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3

Test:

See:

public count(predicate: function(value: T, i: number, source: Observable<T>): boolean): Observable source

计算源的发送数量,并当源完成时发出该数值。

当源完成的时候,告知总共发送了多少个值。

count 将发送数据的 Observable 转化为只发出源 Observable 总共发出的数据项的 Observable。 如果源 Observable 发生错误, count将会发出错误而不是发出值。 如果源 Observable 一直不终结, count 既不会终结也不会发出数据。 这个操作符接受可选的predicate函数做为参数, 在这种情况下,输出则表示源值中满足 predicate 函数的值的数量。

Params:

NameTypeAttributeDescription
predicate function(value: T, i: number, source: Observable<T>): boolean
  • optional

A boolean 函数,用来选择哪些值会被计数。 参数如下:

  • value: 来自源的值
  • index: 来自投射的 Observable 的值的 "index"(从0开始)
  • source: 源 Observable 自身实例。

Return:

Observable

数字类型的 Observable,该数字表示如上所述的计数。

Example:

记录第一次点击之前经过了几秒
var seconds = Rx.Observable.interval(1000);
var clicks = Rx.Observable.fromEvent(document, 'click');
var secondsBeforeClick = seconds.takeUntil(clicks);
var result = secondsBeforeClick.count();
result.subscribe(x => console.log(x));
记录1到7中间有多少个奇数
var numbers = Rx.Observable.range(1, 7);
var result = numbers.count(i => i % 2 === 1);
result.subscribe(x => console.log(x));

// 结果是:
// 4

Test:

See:

public debounce(durationSelector: function(value: T): SubscribableOrPromise): Observable source

只有在另一个 Observable 决定的一段特定时间经过后并且没有发出另一个源值之后,才从源 Observable 中发出一个值。

就像是 debounceTime, 但是静默时间段由第二个 Observable 决定。

debounce 延时发送源 Observable 发出的值,但如果源 Observable 发出了新值 的话,它会丢弃掉前一个等待中的延迟发送。这个操作符会追踪源 Observable 的最新值, 并通过调用 durationSelector 函数来生产 duration Observable。只有当 duration Observable 发出值或完成时,才会发出值,如果源 Observable 上没有发 出其他值,那么 duration Observable 就会产生。如果在 duration Observable 发 出前出现了新值,那么前一个值会被丢弃并且不会在输出 Observable 上发出。

就像debounceTime, 这是一个限制发出频率的操作符, 因为输出发送并不一定是 在同一时间发生的,就像它们在源 Observable 上所做的那样。

Params:

NameTypeAttributeDescription
durationSelector function(value: T): SubscribableOrPromise

该函数接受 源Observable的值, 用于计算每个值的延迟持续时间, 返回一个Observable或者Promise.

Return:

Observable

Observable,通过 durationSelector 返回的特定 duration Observable 来延迟源 Observable 的发送,如果发送过于频繁可能会丢弃一些值。

Example:

在一顿狂点后只发出最新的点击
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.debounce(() => Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));

Test:

See:

public debounceTime(dueTime: number, scheduler: Scheduler): Observable source

只有在特定的一段时间经过后并且没有发出另一个源值,才从源 Observable 中发出一个值。

就像是delay, 但是只通过每次大量发送中的最新值。

debounceTime 延时发送源 Observable 发送的值,但是会丢弃正在排队的发送如果源 Observable 又发出新值。 该操作符会追踪源 Observable 的最新值, 并且发出它当且仅当在 dueTime 时间段内 没有发送行为。 如果新的值在dueTime静默时间段出现, 之前的值会被丢弃并且不会在输出 Observable 中发出。

这是一个控制发送频率的操作符,因为不可能在任何时间窗口的持续时间(dueTime)内发出一个以上的值,同样也是一个延时类操作符,因为输出 并不一定发生在同一时间,正如源 Observable 上发生的。 可选性的接收一个 IScheduler 用于管理定时器。

Params:

NameTypeAttributeDescription
dueTime number

在发送最新的源值之前需要等待的以毫秒为单位(或者由可选的scheduler 提供的时间单位)的时间间隔。

scheduler Scheduler
  • optional
  • default: async

调节器( IScheduler ),用于管理处理每个值的延时的定时器。

Return:

Observable

Observable,通过指定的 dueTime 来延迟源 Observable 的发送,如果发送过于频繁可能会丢弃一些值。

Example:

在一顿狂点后只发出最新的点击
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.debounceTime(1000);
result.subscribe(x => console.log(x));

Test:

See:

public defaultIfEmpty(defaultValue: any): Observable source

如果源 Observable 在完成之前没有发出任何 next 值,则发出给定的值,否则返回 Observable 的镜像。

如果源Observable本来就是空的,那么这个操作符会发出一个默认值。

如果源 Observable 是空的(在完成之前没有发出任何 next 值),那么 defaultIfEmpty 会发出源 Observable 或指定的默认值。

Params:

NameTypeAttributeDescription
defaultValue any
  • optional
  • default: null

如果源Observable是空的话使用的默认值。

Return:

Observable

Observable, 当源 Observable 不发出值时,该 Observable 发出指定的 defaultValue ,否则发出源 Observable 所发出的值。

Example:

如果在5秒内没有点击事件发生,发出"no clicks"
var clicks = Rx.Observable.fromEvent(document, 'click');
var clicksBeforeFive = clicks.takeUntil(Rx.Observable.interval(5000));
var result = clicksBeforeFive.defaultIfEmpty('no clicks');
result.subscribe(x => console.log(x));

Test:

See:

public delay(delay: number | Date, scheduler: Scheduler): Observable source

通过给定的超时或者直到一个给定的时间来延迟源 Observable 的发送。

每个数据项的发出时间都往后推移固定的毫秒数.

如果延时参数是数字, 这个操作符会将源 Observable 的发出时间都往后推移固定的毫秒数。 保存值之间的相对时间间隔.

如果延迟参数是日期类型, 这个操作符会延时Observable的执行直到到了给定的时间.

Params:

NameTypeAttributeDescription
delay number | Date

延迟时间(以毫秒为单位的数字)或 Date 对象(发送延迟到这个时间点)。

scheduler Scheduler
  • optional
  • default: async

调度器,用来管理处理每项时延的定时器。

Return:

Observable

该 Observalbe 通过指定的超时时间或日期来延迟源 Observable 的发送。

Example:

每次点击延迟1秒
var clicks = Rx.Observable.fromEvent(document, 'click');
var delayedClicks = clicks.delay(1000); // each click emitted after 1 second
delayedClicks.subscribe(x => console.log(x));
延时所有的点击直到到达未来的时间点
var clicks = Rx.Observable.fromEvent(document, 'click');
var date = new Date('March 15, 2050 12:00:00'); // in the future
var delayedClicks = clicks.delay(date); // click emitted only after that date
delayedClicks.subscribe(x => console.log(x));

Test:

See:

public delayWhen(delayDurationSelector: function(value: T): Observable, subscriptionDelay: Observable): Observable source

在给定的时间范围内,延迟源 Observable 所有数据项的发送,该时间段由另一个 Observable 的发送决定。

就像是delay, 但是延时的时间间隔由第二个Observable决定.

delayWhen 通过由另一个 Observable 决定的时间段来延迟源 Observable 的每个发出值。 当源发出一个数据,delayDurationSelector 函数将该源值当做参数, 返回一个被称为“持续”的 Observable。 当且仅当持续的 Observable 发出或完成时,源值才会在输出 Observable 上发出。

可选的, delayWhen 接受第二个参数, subscriptionDelay,它是一个 Observable。 当 subscriptionDelay 发出第一个值或者完成,源 Observable 被订阅并且开始像前一段描 述的一样。 如果 subscriptionDelay 没有提供,delayWhen 将会订阅源 Observable 只 要输出 Observable 被订阅。

Params:

NameTypeAttributeDescription
delayDurationSelector function(value: T): Observable

该函数为源 Observable 发出的每个值返回一个 Observable,用于延迟在输出 Observable 上的发送,直到返回的 Observable 发出值。

subscriptionDelay Observable

Observable,该 Observable 一旦发出任何值则触发源 Observable 的订阅。

Return:

Observable

Observable,延时源Observable的发出时间,该时间由delayDurationSelector 返回的Observable决定.

Example:

将每次点击延迟0到5秒的随机时间
var clicks = Rx.Observable.fromEvent(document, 'click');
var delayedClicks = clicks.delayWhen(event =>
  Rx.Observable.interval(Math.random() * 5000)
);
delayedClicks.subscribe(x => console.log(x));

Test:

See:

public dematerialize(): Observable source

Notification 对象的 Observable 转换成它们所代表的发送。

Notification 对象拆开成实际的 nexterrorcomplete 发送。它与 materialize 是相反的。

dematerialize 被假定用来操作只发送值为 Notification 对象的 next, 不发送 error 的 Observable 。这样的 Obseravble 其实是 materialize 操作符的输出。然后这些通知会使用它们所包含的元数据进行拆解,并在输出 Observable 上 发出 nexterrorcomplete

materialize 结合来使用此操作符。

Return:

Observable

该 Observable 会发出数据项和通知,它们是由源 Observable 所发出 并且包装在 Notification 对象之中的。

Example:

将 Notification 类型的 Observable 转换成实际的 Observable
var notifA = new Rx.Notification('N', 'A');
var notifB = new Rx.Notification('N', 'B');
var notifE = new Rx.Notification('E', void 0,
  new TypeError('x.toUpperCase is not a function')
);
var materialized = Rx.Observable.of(notifA, notifB, notifE);
var upperCase = materialized.dematerialize();
upperCase.subscribe(x => console.log(x), e => console.error(e));

// 结果:
// A
// B
// TypeError: x.toUpperCase is not a function

Test:

See:

public distinct(keySelector: function, flushes: Observable): Observable source

返回 Observable,它发出由源 Observable 所发出的所有与之前的项都不相同的项。

如果提供了 keySelector 函数,那么它会将源 Observable 的每个值都投射成一个新的值,这个值会用来检查是否与先前投射的值相等。如果没有提供 keySelector 函数,它会直接使用源 Observable 的每个值来检查是否与先前的值相等。

在支持 Set 的 JavaScript 运行时中,此操作符会使用 Set 来提升不同值检查的性能。

在其他运行时中,此操作符会使用 Set 的最小化实现,此实现在底层依赖于 ArrayindexOf,因为要检查更多的值来进行区分,所以性能会降低。 即使是在新浏览器中,长时间运行的 distinct 操作也可能会导致内存泄露。为了在某种场景下来缓解这个问题,可以提供一个可选的 flushes 参数, 这样内部的 Set 可以被“清空”,基本上清除了它的所有值。

Params:

NameTypeAttributeDescription
keySelector function
  • optional

可选函数,用来选择某个键的值以检查是否是不同的。

flushes Observable
  • optional

可选 Observable,用来清空操作符内部的 HashSet 。

Return:

Observable

该 Observable 发出从源 Observable 中得到的不同的值。

Example:

使用数字的简单示例
Observable.of(1, 1, 2, 2, 2, 1, 2, 3, 4, 3, 2, 1)
  .distinct()
  .subscribe(x => console.log(x)); // 1, 2, 3, 4
使用 keySelector 函数的示例
interface Person {
   age: number,
   name: string
}

Observable.of<Person>(
    { age: 4, name: 'Foo'},
    { age: 7, name: 'Bar'},
    { age: 5, name: 'Foo'})
    .distinct((p: Person) => p.name)
    .subscribe(x => console.log(x));

// 显示:
// { age: 4, name: 'Foo' }
// { age: 7, name: 'Bar' }

Test:

See:

public distinctUntilChanged(compare: function): Observable source

返回 Observable,它发出源 Observable 发出的所有与前一项不相同的项。

如果提供了 compare 函数,那么每一项都会调用它来检验是否应该发出这个值。

如果没有提供 compare 函数,默认使用相等检查。

Params:

NameTypeAttributeDescription
compare function
  • optional

可选比较函数,用来检验当前项与源中的前一项是否相同。

Return:

Observable

该 Observable 发出从源 Observable 中得到的与前一项不同的值。

Example:

使用数字的简单示例
Observable.of(1, 1, 2, 2, 2, 1, 1, 2, 3, 3, 4)
  .distinctUntilChanged()
  .subscribe(x => console.log(x)); // 1, 2, 1, 2, 3, 4
使用 compare 函数的示例
interface Person {
   age: number,
   name: string
}

Observable.of<Person>(
    { age: 4, name: 'Foo'},
    { age: 7, name: 'Bar'},
    { age: 5, name: 'Foo'})
    { age: 6, name: 'Foo'})
    .distinctUntilChanged((p: Person, q: Person) => p.name === q.name)
    .subscribe(x => console.log(x));

// 显示:
// { age: 4, name: 'Foo' }
// { age: 7, name: 'Bar' }
// { age: 5, name: 'Foo' }

Test:

See:

public distinctUntilKeyChanged(key: string, compare: function): Observable source

返回 Observable,它发出源 Observable 发出的所有与前一项不相同的项,使用通过提供的 key 访问到的属性来检查两个项是否不同。

如果提供了 compare 函数,那么每一项都会调用它来检验是否应该发出这个值。

如果没有提供 compare 函数,默认使用相等检查。

Params:

NameTypeAttributeDescription
key string

每项中用于查找对象属性的字符串键。

compare function
  • optional

可选比较函数,用来检验当前项与源中的前一项是否相同。

Return:

Observable

该 Observable 发出从源 Observable 中基于指定的 key 得到与前一项不同的值。

Example:

比较人名的示例

 interface Person {
    age: number,
    name: string
 }

Observable.of<Person>(
    { age: 4, name: 'Foo'},
    { age: 7, name: 'Bar'},
    { age: 5, name: 'Foo'},
    { age: 6, name: 'Foo'})
    .distinctUntilKeyChanged('name')
    .subscribe(x => console.log(x));

// 显示:
// { age: 4, name: 'Foo' }
// { age: 7, name: 'Bar' }
// { age: 5, name: 'Foo' }
比较名字前三个字母的示例

interface Person {
    age: number,
    name: string
 }

Observable.of<Person>(
    { age: 4, name: 'Foo1'},
    { age: 7, name: 'Bar'},
    { age: 5, name: 'Foo2'},
    { age: 6, name: 'Foo3'})
    .distinctUntilKeyChanged('name', (x: string, y: string) => x.substring(0, 3) === y.substring(0, 3))
    .subscribe(x => console.log(x));

// 显示:
// { age: 4, name: 'Foo1' }
// { age: 7, name: 'Bar' }
// { age: 5, name: 'Foo2' }

Test:

See:

public do(nextOrObserver: Observer | function, error: function, complete: function): Observable source

为源 Observable 上的每次发送执行副作用,但返回的 Observable 与源 Observable 是相同的。

拦截源 Observable 上的每次发送并且运行一个函数,但返回的输出 Observable 与 源 Observable 是相同的,只要不发生错误即可。

返回源 Observable 的镜像,但镜像是修改过的,以便调用提供的 Observer 来为源 Observable 发出的每个值,错误和完成执行副作用。在上述的 Observer 或处理方法中抛出的任何错误都可以 安全地发送到输出 Observable 的错误路径中。

此操作符适用于调试 Observables 以查看值是否正确,或者执行一些其他的副作用操作。

注意:此操作符不同于 Observable 的 subscribe。如果 do 返回的 Observable 没有被订阅, 那么观察者指定的副作用永远不会执行。因此 do 只是侦查已存在的执行,它不会像 subscribe 那样触发执行的发生。

Params:

NameTypeAttributeDescription
nextOrObserver Observer | function
  • optional

普通的观察者对象或者 next 回调函数。

error function
  • optional

源 Observable 的 error 回调函数。

complete function
  • optional

源 Observable 的 complete 回调函数。

Return:

Observable

与源相同的 Observable,但会为每一项的运行指定观察者或回调函数。

Example:

把每次点击映射成该点击的 clientX ,同时还输出点击事件
var clicks = Rx.Observable.fromEvent(document, 'click');
var positions = clicks
  .do(ev => console.log(ev))
  .map(ev => ev.clientX);
positions.subscribe(x => console.log(x));

See:

public elementAt(index: number, defaultValue: T): Observable source

只发出单个值,这个值位于源 Observable 的发送序列中的指定 index 处。

只发出第i个值, 然后完成。

elementAt 返回的 Observable 会发出源 Observable 指定 index 处的项,如果 index 超出范围并且提供了 default 参数的话,会发出一个默认值。如果没有提供 default 参数并且 index 超出范围,那么输出 Observable 会发出一个 ArgumentOutOfRangeError 错误。

Params:

NameTypeAttributeDescription
index number

是 Subscription 开始后的第i个通知的索引数值,该值是从 0 开始。

defaultValue T
  • optional

缺失索引时返回的默认值。

Return:

Observable

如果能找到这个项的话,那么该 Observable 发出此单个项。找不到时,如果有给定 的默认值,则发出默认值,否则发出错误。

Throw:

ArgumentOutOfRangeError

当使用 elementAt(i) 时,如果 i < 0 或 在发送第i个 next 通知前 Observable 已经完成了,它会发送 ArgumentOutOrRangeError 给观察者的 error 回调函数。

Example:

只发出第三次的点击事件
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.elementAt(2);
result.subscribe(x => console.log(x));

// 结果:
// click 1 = nothing
// click 2 = nothing
// click 3 = 打印到控制台的 MouseEvent 对象

Test:

See:

public every(predicate: function, thisArg: any): Observable source

返回的 Observable 发出是否源 Observable 的每项都满足指定的条件。

Params:

NameTypeAttributeDescription
predicate function

用来确定每一项是否满足指定条件的函数。

thisArg any
  • optional

可选对象,作为回调函数中的 this 使用。

Return:

Observable

布尔值的 Observable,用来确定是否源 Observable 的所有项都满足指定条件。

Example:

一个简单示例:如果所有元素都小于5就发出 `true`,反之 `false`
 Observable.of(1, 2, 3, 4, 5, 6)
    .every(x => x < 5)
    .subscribe(x => console.log(x)); // -> false

Test:

public exhaust(): Observable source

当前一个内部 Observable 还未完成的情况下,通过丢弃内部 Observable 使得 高阶 Observable 转换成一阶 Observable。

在当前内部 Observable 仍在执行的情况下,通过丢弃 接下来的内部 Observable 将高阶 Observable 打平。

exhaust 订阅发出 Observables 的 Observable,也就是高阶 Observable 。 每次观察到这些已发出的内部 Observables 中的其中一个时,输出 Observable 开始发出该内部 Observable 要发出的项。到目前为止,它的行为就像 mergeAll 。然而,如果前一个 Observable 还未完成的话,exhaust 会忽略每个新的内部 Observable 。一旦完成,它将接受并打平下一个 内部 Observable ,然后重复此过程。

Return:

Observable

Observable 接收源 Observable 并只专注于传播第一个 Observable 直到它完成,然后订阅下一个 Observable 。

Example:

只要没有当前活动的计时器,那么每次点击就会运行一个有限的计时器。
var clicks = Rx.Observable.fromEvent(document, 'click');
var higherOrder = clicks.map((ev) => Rx.Observable.interval(1000).take(5));
var result = higherOrder.exhaust();
result.subscribe(x => console.log(x));

Test:

See:

public exhaustMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable source

将每个源值投射成 Observable,只有当前一个投射的 Observable 已经完成, 这个 Observable 才会被合并到输出 Observable 中。

把每个值映射成 Observable,然后使用 exhaust 操作符打平所有的内部 Observables 。

返回的 Observable 基于应用一个函数来发送项,该函数提供给源 Observable 发出的每个项, 并返回一个(所谓的“内部”) Observable 。当它将源值投射成 Observable 时,输出 Observable 开始发出由投射的 Observable 发出的项。然而,如果前一个投射的 Observable 还未完成的话, 那么 exhaustMap 会忽略每个新投射的 Observable 。一旦完成,它将接受并打平下一个 内部 Observable ,然后重复此过程。

Params:

NameTypeAttributeDescription
project function(value: T, ?index: number): ObservableInput

函数, 当应用于源 Observable 发出的项时,返回一个 Observable 。

resultSelector function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any
  • optional

函数,它用于产生基于值的输出 Observable 和源(外部)发送和内部 Observable 发送的索引。 传递给这个函数参数有:

  • outerValue: 来自源的值
  • innerValue: 来自投射的 Observable 的值
  • outerIndex: 来自源的值的 "index"
  • innerIndex: 来自投射的 Observable 的值的 "index"

Return:

Observable

这个 Observable 包含源中每项的投射 Observable, 忽略在前一个 Observable 完成之前就已经开始的 Observable。

Example:

只要没有当前活动的计时器,那么每次点击就会运行一个有限的计时器。
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.exhaustMap((ev) => Rx.Observable.interval(1000).take(5));
result.subscribe(x => console.log(x));

Test:

See:

public expand(project: function(value: T, index: number), concurrent: number): Observable source

递归地将每个源值投射成 Observable,这个 Observable 会被合并到输出 Observable 中。

它与 mergeMap 类似,但将投射函数应用于每个源值 以及每个输出值。它是递归的。

返回的 Observable 基于应用一个函数来发送项,该函数提供给源 Observable 发出的每个项, 并返回一个 Observable,然后合并这些作为结果的 Observable,并发出本次合并的结果。 expand 会重新发出在输出 Observable 上的每个源值。然后,将每个输出值传给投射函数, 该函数返回要合并到输出 Observable 上的内部 Observable 。由投影产生的那些输出值也会 被传给投射函数以产生新的输出值。这就是 expand 如何进行递归的。

Params:

NameTypeAttributeDescription
project function(value: T, index: number)

函数, 当应用于源 Observable 或输出 Observable 发出的项时,返回一个 Observable 。

concurrent number
  • optional
  • default: Number.POSITIVE_INFINITY

同时订阅输入 Observables 的最大数量。

Return:

Observable

Observable 发出源值,同时也将投影函数应用于在输出 Observable 上 发出的每个值以得到结果,然后合并这些从转换后得到的 Observables 的结果。

Example:

每次点击开始发出的值都是乘以2的,最多连乘10次
var clicks = Rx.Observable.fromEvent(document, 'click');
var powersOfTwo = clicks
  .mapTo(1)
  .expand(x => Rx.Observable.of(2 * x).delay(1000))
  .take(10);
powersOfTwo.subscribe(x => console.log(x));

Test:

See:

public filter(predicate: function(value: T, index: number): boolean, thisArg: any): Observable source

通过只发送源 Observable 的中满足指定 predicate 函数的项来进行过滤。

类似于 Array.prototype.filter(), 它只会发出源 Observable 中符合标准函数的值。

类似于大家所熟知的 Array.prototype.filter 方法,此操作符从源 Observable 中 接收值,将值传递给 predicate 函数并且只发出返回 true 的这些值。

Params:

NameTypeAttributeDescription
predicate function(value: T, index: number): boolean

评估源 Observable 所发出的每个值的函数。如果它返回 true,就发出值,如果是 false 则不会传给输出 Observable 。index 参数是自订阅开始后发送序列的索引,是从 0 开始的。

thisArg any
  • optional

可选参数,用来决定 predicate 函数中的 this 的值。

Return:

Observable

值的 Observable,这些值来自源 Observable 并且 是 predicate 函数所允许的。

Example:

只发出目标是 DIV 元素的点击事件
var clicks = Rx.Observable.fromEvent(document, 'click');
var clicksOnDivs = clicks.filter(ev => ev.target.tagName === 'DIV');
clicksOnDivs.subscribe(x => console.log(x));

Test:

See:

public find(predicate: function(value: T, index: number, source: Observable<T>): boolean, thisArg: any): Observable<T> source

只发出源 Observable 所发出的值中第一个满足条件的值。

找到第一个通过测试的值并将其发出。

find 会查找源 Observable 中与 predicate 函数体现的指定条件匹配的第一项,然后 将其返回。不同于 first,在 findpredicate 是必须的,而且如果没找到 有效的值的话也不会发出错误。

Params:

NameTypeAttributeDescription
predicate function(value: T, index: number, source: Observable<T>): boolean

使用每项来调用的函数,用于测试是否符合条件。

thisArg any
  • optional

可选参数,用来决定 predicate 函数中的 this 的值。

Return:

Observable<T>

符合条件的第一项的 Observable 。

Example:

找到并发出第一个点击 DIV 元素的事件
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.find(ev => ev.target.tagName === 'DIV');
result.subscribe(x => console.log(x));

Test:

See:

public findIndex(predicate: function(value: T, index: number, source: Observable<T>): boolean, thisArg: any): Observable<T> source

只发出源 Observable 所发出的值中第一个满足条件的值的索引。

它很像 find , 但发出的是找到的值的索引, 而不是值本身。

findIndex 会查找源 Observable 中与 predicate 函数体现的指定条件匹配的第一项,然后 返回其索引(从0开始)。不同于 first,在 findIndexpredicate 是必须的,而且如果没找到 有效的值的话也不会发出错误。

Params:

NameTypeAttributeDescription
predicate function(value: T, index: number, source: Observable<T>): boolean

使用每项来调用的函数,用于测试是否符合条件。

thisArg any
  • optional

可选参数,用来决定 predicate 函数中的 this 的值。

Return:

Observable<T>

符合条件的第一项的索引的 Observable 。

Example:

找到并发出第一个点击 DIV 元素的事件的索引
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.findIndex(ev => ev.target.tagName === 'DIV');
result.subscribe(x => console.log(x));

Test:

See:

public first(predicate: function(value: T, index: number, source: Observable<T>): boolean, resultSelector: function(value: T, index: number): R, defaultValue: R): Observable<T | R> source

只发出由源 Observable 所发出的值中第一个(或第一个满足条件的值)。

只发出第一个值。或者只发出第一个通过测试的值。

如果不使用参数调用,first 会发出源 Observable 中的第一个值,然后完成。如果使用 predicate 函数来调用,first 会发出源 Observable 第一个满足条件的值。它还可以 接收 resultSelector 函数根据输入值生成输出值,假如在源 Observable 完成前无法发 出一个有效值的话,那么会发出 defaultValue 。如果没有提供 defaultValue 并且也 找不到匹配的元素,则抛出错误。

Params:

NameTypeAttributeDescription
predicate function(value: T, index: number, source: Observable<T>): boolean
  • optional

使用每项来调用的可选函数,用于测试是否符合条件。

resultSelector function(value: T, index: number): R
  • optional

函数,它基于源 Observable 的值和索引来生成输出 Observable 的值。传给这个函数的参数有:

  • value: 在源 Observable 上发出的值。
  • index: 源值的索引。
defaultValue R
  • optional

假如在源 Observable 上没有找到有效值,就会发出这个 默认值。

Return:

Observable<T | R>

符合条件的第一项的 Observable 。

Throw:

EmptyError

如果在 Observable 完成之前还没有发出任何 next 通知的话, 就把 EmptyError 发送给观察者的 error 回调函数。

Example:

只发出第一次点击 DOM 的事件
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.first();
result.subscribe(x => console.log(x));
只发出第一次点击 DIV 元素的事件
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.first(ev => ev.target.tagName === 'DIV');
result.subscribe(x => console.log(x));

Test:

See:

public forEach(next: Function, PromiseCtor: PromiseConstructor): Promise source

Params:

NameTypeAttributeDescription
next Function

observable 发出的每个值的处理器。

PromiseCtor PromiseConstructor
  • optional

用来生成 Promise 的构造函数。

Return:

Promise

一个 observable 完成则 resolves,错误则 rejects 的 promise。

public groupBy(keySelector: function(value: T): K, elementSelector: function(value: T): R, durationSelector: function(grouped: GroupedObservable<K, R>): Observable<any>): Observable<GroupedObservable<K, R>> source

根据指定条件将源 Observable 发出的值进行分组,并将这些分组作为 GroupedObservables 发出,每一个分组都是一个 GroupedObservable

Params:

NameTypeAttributeDescription
keySelector function(value: T): K

提取每项的键的函数。

elementSelector function(value: T): R
  • optional

提取每项返回元素的函数。

durationSelector function(grouped: GroupedObservable<K, R>): Observable<any>
  • optional

返回一个 Observable 来确定每个组应该存在多长时间的函数。

Return:

Observable<GroupedObservable<K, R>>

发出 GroupedObservables 的 Observable, 每个 GroupedObservable 对应唯一的键值,并且会发出源 Observable 中共享该键值的项。

Example:

通过 id 分组并返回数组
Observable.of<Obj>({id: 1, name: 'aze1'},
                   {id: 2, name: 'sf2'},
                   {id: 2, name: 'dg2'},
                   {id: 1, name: 'erg1'},
                   {id: 1, name: 'df1'},
                   {id: 2, name: 'sfqfb2'},
                   {id: 3, name: 'qfs3'},
                   {id: 2, name: 'qsgqsfg2'}
    )
    .groupBy(p => p.id)
    .flatMap( (group$) => group$.reduce((acc, cur) => [...acc, cur], []))
    .subscribe(p => console.log(p));

// 显示:
// [ { id: 1, name: 'aze1' },
//   { id: 1, name: 'erg1' },
//   { id: 1, name: 'df1' } ]
//
// [ { id: 2, name: 'sf2' },
//   { id: 2, name: 'dg2' },
//   { id: 2, name: 'sfqfb2' },
//   { id: 2, name: 'qsgqsfg2' } ]
//
// [ { id: 3, name: 'qfs3' } ]
以 id 字段为主组装数据
Observable.of<Obj>({id: 1, name: 'aze1'},
                   {id: 2, name: 'sf2'},
                   {id: 2, name: 'dg2'},
                   {id: 1, name: 'erg1'},
                   {id: 1, name: 'df1'},
                   {id: 2, name: 'sfqfb2'},
                   {id: 3, name: 'qfs1'},
                   {id: 2, name: 'qsgqsfg2'}
                  )
    .groupBy(p => p.id, p => p.name)
    .flatMap( (group$) => group$.reduce((acc, cur) => [...acc, cur], ["" + group$.key]))
    .map(arr => ({'id': parseInt(arr[0]), 'values': arr.slice(1)}))
    .subscribe(p => console.log(p));

// 显示:
// { id: 1, values: [ 'aze1', 'erg1', 'df1' ] }
// { id: 2, values: [ 'sf2', 'dg2', 'sfqfb2', 'qsgqsfg2' ] }
// { id: 3, values: [ 'qfs1' ] }

Test:

public ignoreElements(): Observable source

忽略源 Observable 所发送的所有项,只传递 completeerror 的调用。

Return:

Observable

该 Observable 是空的,只调用 completeerror,调用是基于源 Observable 的调用。

Test:

public isEmpty(): Observable source

如果源 Observable 是空的话,它返回一个发出 true 的 Observable,否则发出 false 。

Return:

Observable

发出布尔值的 Observable 。

Test:

public last(predicate: function): Observable source

返回的 Observable 只发出由源 Observable 发出的最后一个值。它可以接收一个可选的 predicate 函数作为 参数,如果传入 predicate 的话则发送的不是源 Observable 的最后一项,而是发出源 Observable 中 满足 predicate 函数的最后一项。

Params:

NameTypeAttributeDescription
predicate function

任何由源 Observable 发出的项都必须满足的条件函数。

Return:

Observable

该 Observable 只发出源 Observable 中满足给定条件的最后一项, 或者没有任何项满足条件时发出 NoSuchElementException 。

Throw:

EmptyError

如果 Observale 完成前还没有发出任何 next 通知的话,就会 发送 EmptyError 给观察者的 error 回调函数。

*

如果在源 Observable 中没有匹配 predicate 函数的项,则抛出。

Test:

public letProto(func: *): Observable<R> source

Params:

NameTypeAttributeDescription
func *

Return:

Observable<R>

public lift(operator: Operator): Observable source

创建一个新的 Observable,以它作为源,并传递操作符的定义作为新的 observable 操作符。

Params:

NameTypeAttributeDescription
operator Operator

定义了如何操作 observable 的操作符。

Return:

Observable

应用了操作符的新 observable。

public map(project: function(value: T, index: number): R, thisArg: any): Observable<R> source

将给定的 project 函数应用于源 Observable 发出的每个值,并将结果值作为 Observable 发出。

类似于 Array.prototype.map(), 它把每个源值传递给转化函数以获得相应的输出值。

类似于大家所熟知的 Array.prototype.map 方法,此操作符将投射函数应用于每个值 并且在输出 Observable 中发出投射后的结果。

Params:

NameTypeAttributeDescription
project function(value: T, index: number): R

应用于由源 Observable 所发出的每个值的函数。index 参数是自订阅开始后发送序列的索引,是从 0 开始的。

thisArg any
  • optional

可选参数,定义在 project 函数中的 this 是什么。

Return:

Observable<R>

该 Observable 发出源 Observable 中经过给定的 project 函数转换的值。

Example:

将每次点击映射为这次点击的 clientX
var clicks = Rx.Observable.fromEvent(document, 'click');
var positions = clicks.map(ev => ev.clientX);
positions.subscribe(x => console.log(x));

Test:

See:

public mapTo(value: any): Observable source

每次源 Observble 发出值时,都在输出 Observable 上发出给定的常量值。

类似于 map,但它每一次都把源值映射成同一个输出值。

接收常量 value 作为参数,并每当源 Observable 发出值时都发出这个值。换句话说, 就是忽略实际的源值,然后简单地使用这个发送时间点以知道何时发出给定的 value

Params:

NameTypeAttributeDescription
value any

将每个源值映射成的值。

Return:

Observable

该 Observable 在每次源 Observable 发出值的时候发出给定 的 value

Example:

把每次点击映射成字符串 'Hi'
var clicks = Rx.Observable.fromEvent(document, 'click');
var greetings = clicks.mapTo('Hi');
greetings.subscribe(x => console.log(x));

Test:

See:

public materialize(): Observable<Notification<T>> source

表示源 Observable 中的所有通知,每个通知都会在 Notification 对象中标记为 它们原始的通知类型,并会作为输出 Observable 的 next 通知。

Notification 对象中包装 nexterrorcomplete 发送, 并在输出 Observable 上作为 next 发送出去。

materialize 返回一个 Observable,这个 Observable 会为每个源 Observable 的 nexterrorcomplete 通知发出 next 通知。当源 Observable 发出 complete 时, 输出 Observable 会发出 next 并且 Notification 类型为 "complete",然后它也发出 complete 。当源 Observable 发出 error 时,输出 Observable 会发出 next 并且 Notification 类型为 "error",然后发出 complete

该操作符对于生成源 Observable 的元数据很有用,并作为 next 发送使用掉。 与 dematerialize 结合使用。

Return:

Observable<Notification<T>>

该 Observable 会发出 Notification 对象, 该对象包装了来自源 Observable 的带有元数据的原始通知。

Example:

将一个错误的 Observable 转换成 Notification 类型的 Observable
var letters = Rx.Observable.of('a', 'b', 13, 'd');
var upperCase = letters.map(x => x.toUpperCase());
var materialized = upperCase.materialize();
materialized.subscribe(x => console.log(x));

// 结果如下:
// - Notification {kind: "N", value: "A", error: undefined, hasValue: true}
// - Notification {kind: "N", value: "B", error: undefined, hasValue: true}
// - Notification {kind: "E", value: undefined, error: TypeError:
//   x.toUpperCase is not a function at MapSubscriber.letters.map.x
//   [as project] (http://1…, hasValue: false}

Test:

See:

public max(comparer: Function): Observable source

max 操作符操作的 Observable 发出数字(或可以与提供的函数进行比较的项)并且当源 Observable 完成时它发出单一项:最大值的项。

Params:

NameTypeAttributeDescription
comparer Function
  • optional

可选的比较函数,用它来替代默认值来比较两项的值。

Return:

Observable

该 Observable 发出最大值的项。

Example:

获取一连串数字中的最大值
Rx.Observable.of(5, 4, 7, 2, 8)
  .max()
  .subscribe(x => console.log(x)); // -> 8
使用比较函数来获取最大值的项
interface Person {
  age: number,
  name: string
}
Observable.of<Person>({age: 7, name: 'Foo'},
                      {age: 5, name: 'Bar'},
                      {age: 9, name: 'Beer'})
          .max<Person>((a: Person, b: Person) => a.age < b.age ? -1 : 1)
          .subscribe((x: Person) => console.log(x.name)); // -> 'Beer'
}

Test:

See:

public merge(other: ObservableInput, concurrent: number, scheduler: Scheduler): Observable source

创建一个输出 Observable ,它可以同时发出每个给定的输入 Observable 中的所有值。

通过把多个 Observables 的值混合到一个 Observable 中 来将其打平。

merge 订阅每个给定的输入 Observable (给定的源或作为参数的 Observable ),然后只是 将所有输入 Observables 的所有值发送(不进行任何转换)到输出 Observable 。所有的输入 Observable 都完成了,输出 Observable 才能完成。任何由输入 Observable 发出的错误都 会立即在输出 Observalbe 上发出。

Params:

NameTypeAttributeDescription
other ObservableInput

可以与源 Observable 合并的输入 Observable 。 可以给定多个输入 Observables 作为参数。

concurrent number
  • optional
  • default: Number.POSITIVE_INFINITY

可以同时订阅的输入 Observables 的最大数量。

scheduler Scheduler
  • optional
  • default: null

用来管理输入 Observables 的并发性的 调度器。

Return:

Observable

该 Observable 发出的项是每个输入 Observable 的结果。

Example:

合并两个 Observables: 时间间隔为1秒的 timer 和 clicks
var clicks = Rx.Observable.fromEvent(document, 'click');
var timer = Rx.Observable.interval(1000);
var clicksOrTimer = clicks.merge(timer);
clicksOrTimer.subscribe(x => console.log(x));
合并三个 Observables ,但只能同时运行两个
var timer1 = Rx.Observable.interval(1000).take(10);
var timer2 = Rx.Observable.interval(2000).take(6);
var timer3 = Rx.Observable.interval(500).take(10);
var concurrent = 2; // 参数
var merged = timer1.merge(timer2, timer3, concurrent);
merged.subscribe(x => console.log(x));

Test:

See:

public mergeAll(concurrent: number): Observable source

将高阶 Observable 转换成一阶 Observable ,一阶 Observable 会同时发出在内部 Observables 上发出的所有值。

打平高阶 Observable 。

mergeAll 订阅发出 Observables 的 Observalbe ,也称为高阶 Observable 。 每当观察到发出的内部 Observable 时,它会订阅并发出输出 Observable 上的这个 内部 Observable 的所有值。所有的内部 Observable 都完成了,输出 Observable 才能完成。任何由内部 Observable 发出的错误都会立即在输出 Observalbe 上发出。

Params:

NameTypeAttributeDescription
concurrent number
  • optional
  • default: Number.POSITIVE_INFINITY

可以同时订阅的输入 Observables 的最大数量。

Return:

Observable

该 Observable 发出的值来自所有由源 Observable 发出的 内部 Observables 。

Example:

为每个点击事件创建一个新的 interval Observable ,并将其输出混合为一个 Observable
var clicks = Rx.Observable.fromEvent(document, 'click');
var higherOrder = clicks.map((ev) => Rx.Observable.interval(1000));
var firstOrder = higherOrder.mergeAll();
firstOrder.subscribe(x => console.log(x));
每次点击都会从0到9计数(每秒计数一次),但只允许最多同时只能有两个计时器
var clicks = Rx.Observable.fromEvent(document, 'click');
var higherOrder = clicks.map((ev) => Rx.Observable.interval(1000).take(10));
var firstOrder = higherOrder.mergeAll(2);
firstOrder.subscribe(x => console.log(x));

Test:

See:

public mergeMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable source

将每个源值投射成 Observable ,该 Observable 会合并到输出 Observable 中。

将每个值映射成 Observable ,然后使用 mergeAll 打平所有的内部 Observables 。

返回的 Observable 基于应用一个函数来发送项,该函数提供给源 Observable 发出的每个项, 并返回一个 Observable,然后合并这些作为结果的 Observable,并发出本次合并的结果。

Params:

NameTypeAttributeDescription
project function(value: T, ?index: number): ObservableInput

函数,

    • 当应用于源 Observable 发出的项时,返回一个 Observable 。
resultSelector function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any
  • optional
  • 函数,它用于产生基于值的输出 Observable 和源(外部)发送和内部 Observable 发送的索引。
    • 传递给这个函数参数有:
      • outerValue: 来自源的值
      • innerValue: 来自投射的 Observable 的值
      • outerIndex: 来自源的值的 "index"
      • innerIndex: 来自投射的 Observable 的值的 "index"
concurrent number
  • optional
  • default: Number.POSITIVE_INFINITY

可以同时订阅的输入 Observables 的最大数量。

Return:

Observable

该 Observable 发出由源 Observable 发出的每项应用投射函数 (和可选的 resultSelector)后的结果,并合并从该转化获得的 Observables 的结果。

Example:

将每个字母映射并打平成一个 Observable ,每1秒钟一次
var letters = Rx.Observable.of('a', 'b', 'c');
var result = letters.mergeMap(x =>
  Rx.Observable.interval(1000).map(i => x+i)
);
result.subscribe(x => console.log(x));

// 结果如下:
// a0
// b0
// c0
// a1
// b1
// c1
// 继续列出a、b、c加上各自的自增数列

Test:

See:

public mergeMapTo(innerObservable: ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable source

将每个源值投射成同一个 Observable ,该 Observable 会多次合并到输出 Observable 中。

它很像 mergeMap,但永远将每个值映射到同一个内部 Observable 。

将每个源值映射成给定的 Observable :innerObservable ,而无论源值是什么,然后 将这些结果 Observables 合并到单个的 Observable ,也就是输出 Observable 。

Params:

NameTypeAttributeDescription
innerObservable ObservableInput

用来替换源 Observable 中的每个值 的 Observable 。

resultSelector function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any
  • optional
  • 函数,它用于产生基于值的输出 Observable 和源(外部)发送和内部 Observable 发送的索引。
    • 传递给这个函数参数有:
      • outerValue: 来自源的值
      • innerValue: 来自投射的 Observable 的值
      • outerIndex: 来自源的值的 "index"
      • innerIndex: 来自投射的 Observable 的值的 "index"
concurrent number
  • optional
  • default: Number.POSITIVE_INFINITY

可以同时订阅的输入 Observables 的最大数量。

Return:

Observable

每次源 Observable 发出值时,该 Observable 发出来自 给定 innerObservable (和通过 resultSelector 的可选的转换)的项。

Example:

对于每次点击事件,都开启一个时间间隔为1秒的 interval Observable
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.mergeMapTo(Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));

Test:

See:

public mergeScan(accumulator: function(acc: R, value: T): Observable<R>, seed: *, concurrent: number): Observable<R> source

在源 Observable 上应用 accumulator 函数,其中 accumulator 函数本身返回 Observable ,然后每个返回的中间 Observable 会被合并到输出 Observable 中。

它很像 scan,但 accumulator 函数返回的 Observables 会被合并到外部 Observalbe 中。

Params:

NameTypeAttributeDescription
accumulator function(acc: R, value: T): Observable<R>

在每个源值上调用的累加器函数。

seed *

初始的累加值。

concurrent number
  • optional
  • default: Number.POSITIVE_INFINITY

可以同时订阅的输入 Observables 的最大数量。

Return:

Observable<R>

累加值的 Observable 。

Example:

点击计数
const click$ = Rx.Observable.fromEvent(document, 'click');
const one$ = click$.mapTo(1);
const seed = 0;
const count$ = one$.mergeScan((acc, one) => Rx.Observable.of(acc + one), seed);
count$.subscribe(x => console.log(x));

// 结果:
1
2
3
4
// ...以此类推,每次点击计数增加1

Test:

public min(comparer: Function): Observable<R> source

min 操作符操作的 Observable 发出数字(或可以使用提供函数进行比较的项)并且当源 Observable 完成时它发出单一项:最小值的项。

Params:

NameTypeAttributeDescription
comparer Function
  • optional

可选的比较函数,用它来替代默认值来比较两项的值。

Return:

Observable<R>

该 Observable 发出最小值的项。

Example:

获取一连串数字中的最小值
Rx.Observable.of(5, 4, 7, 2, 8)
  .min()
  .subscribe(x => console.log(x)); // -> 2
使用比较函数来获取最小值的项
interface Person {
  age: number,
  name: string
}
Observable.of<Person>({age: 7, name: 'Foo'},
                      {age: 5, name: 'Bar'},
                      {age: 9, name: 'Beer'})
          .min<Person>( (a: Person, b: Person) => a.age < b.age ? -1 : 1)
          .subscribe((x: Person) => console.log(x.name)); // -> 'Bar'
}

Test:

See:

public multicast(subjectOrSubjectFactory: Function | Subject, selector: Function): Observable source

返回的 Observable 发出对 ConnectableObservable 发出的项调用一个指定的 selector 函数的结果, ConnectableObservable 可以在潜在的多个流之间共享单个 subscription 。

Params:

NameTypeAttributeDescription
subjectOrSubjectFactory Function | Subject

用来创建中间 Subject 的工厂函数,源序列的元素将通过 该 Subject 多播到 selector函数,或者将元素推入该 Subject 。

selector Function
  • optional

可选的选择器函数,可以根据需要多次使用以多播源流,而不会导致源流 生成多个 subscriptions 。给定源的订阅者会从订阅开始的一刻起,接收源的所有通知。

Return:

Observable

该 Observable 发出对 ConnectableObservable 发出的项调用 selector 函数的结果, ConnectableObservable 可以在潜在的多个流之间共享单个 subscription 。

Test:

public observeOn(scheduler: IScheduler, delay: number): Observable<T> source

使用指定的调度器来重新发出源 Observable 的所有通知。

确保从 Observable 的外部使用特定的调度器。

observeOn 操作符接收一个 scheduler 作为第一个参数,它将用于重新安排源 Observable 所发送的通知。如果你不能控制 给定 Observable 的内部调度器,但是想要控制何时发出值,那么这个操作符可能是有用的。

返回的 Observable 发出与源 Observable 相同的通知(nextcompleteerror),但是使用提供的调度器进行了重新安排。 注意,这并不意味着源 Observables 的内部调度器会以任何形式被替换。原始的调度器仍然会被使用,但是当源 Observable 发出 通知时,它会立即重新安排(这时候使用传给 observeOn 的调度器)。在同步地发出大量的值的 Observalbe 上调用 observeOn 是一种反模式,这会将 Observable 的发送分解成异步块。为了实现这一点,调度器必须直接传递给源 Observable (通常是创建它的操作符)。 observeOn 只是简单地像通知延迟一些,以确保这些通知在预期的时间点发出。

事实上,observeOn 接收第二个参数,它以毫秒为单位指定延迟通知的发送时间。observeOndelay 操作符最主要的区别是它会延迟所有通知,包括错误通知,而 delay 会当源 Observable 发出错误时立即通过错误。 通常来说,对于想延迟流中的任何值,强烈推荐使用 delay 操作符,而使用 observeOn 时,用来指定应该使用 哪个调度器来进行通知发送。

Params:

NameTypeAttributeDescription
scheduler IScheduler

用于重新安排源 Observable 的通知的调度器。

delay number
  • optional

应该重新安排的每个通知的延迟时间的毫秒数。

Return:

Observable<T>

该 Observable 发出与源 Observale 同样的通知,但是使用了提供的调度器。

Example:

确保在浏览器重绘前调用订阅中的值。
const intervals = Rx.Observable.interval(10); // 默认情况下,interval 使用异步调度器进行调度

intervals
.observeOn(Rx.Scheduler.animationFrame)       // 但我们将在 animationFrame 调度器上进行观察,
.subscribe(val => {                           // 以确保动画的流畅性。
  someDiv.style.height = val + 'px';
});

Test:

See:

public onErrorResumeNext(observables: ...ObservableInput): Observable source

当任何提供的 Observable 发出完成或错误通知时,它会立即地订阅已传入下一个 Observable 。

无论发生什么,都会执行一系列的 Observables ,即使这意味着要吞咽错误。

onErrorResumeNext 操作符接收一系列的 Observables ,可与直接作为参数或数组提供。如果没提供 Observable , 返回的 Observable 与源 Observable 的行为是相同的。

onErrorResumeNext 返回的 Observable 通过订阅和重新发出源 Observable 的值开始。当流的值完成时,无论 Observable 是完成还是发出错误,onErrorResumeNext 都会订阅作为参数传给该方法的第一个 Observable 。它也会开始重新发出它的值, 再一次,当流完成时,onErrorResumeNext 又会继续订阅已提供的系列 Observable 中另一个,无论前一个 Observable 是否完成或发生错误。这样的行为会持续到系列中没有更多的 Observable ,返回的 Observale 将在此时完成,即使最后订阅的 流是以错误结束的。

因此, onErrorResumeNext 可以认为是某个版本的 concat 操作符,只是当它的输入 Observables 发生 错误时,它更为宽容。然而,concat 只有当前一个 Observable 成功完成了,它才会订阅系列中的下个 Observable , 而 onErrorResumeNext 即使是以错误完成某个 Observalbe 时,它也会订阅下一个。

注意,对于由 Observables 发出的错误,你无法获得访问权限。特别是不要指望可以将这些出现在错误回调函数中的 错误传递给 subscribe 。如果要根据 Observable 发出的错误采取特定的操作,应该尝试使用 catch

Params:

NameTypeAttributeDescription
observables ...ObservableInput

传入的 Observables,可以直接传入,也可以作为数组传入。

Return:

Observable

该 Observable 发出源 Observable 的值,但如果发出错误,它会订阅下一个传入的 Observable ,并以此类推,直到它完成或用完所有的 Observable 。

Example:

在 map 操作失败后订阅下一个 Observable
Rx.Observable.of(1, 2, 3, 0)
  .map(x => {
      if (x === 0) { throw Error(); }
return 10 / x;
  })
  .onErrorResumeNext(Rx.Observable.of(1, 2, 3))
  .subscribe(
    val => console.log(val),
    err => console.log(err),          // 永远不会调用
    () => console.log('that\'s it!')
  );

// 输出:
// 10
// 5
// 3.3333333333333335
// 1
// 2
// 3
// "that's it!"

See:

public pairwise(): Observable<Array<T>> source

将一系列连续的发送成对的组合在一起,并将这些分组作为两个值的数组发出。

将当前值和前一个值作为数组放在一起,然后将其发出。

源 Observable 的第N个发送会使输出 Observable 发出一个数组 [(N-1)th, Nth],即前一个 值和当前值的数组,它们作为一对。出于这个原因,pairwise 发出源 Observable 的 第二个和随后的发送,而不发送第一个,因为它没有前一个值。

Return:

Observable<Array<T>>

该 Observabale 为源 Observable 的 成对的连续值(数组)。

Example:

每次点击(从第二次开始),都会发出与前一次点击的相对距离
var clicks = Rx.Observable.fromEvent(document, 'click');
var pairs = clicks.pairwise();
var distance = pairs.map(pair => {
  var x0 = pair[0].clientX;
  var y0 = pair[0].clientY;
  var x1 = pair[1].clientX;
  var y1 = pair[1].clientY;
  return Math.sqrt(Math.pow(x0 - x1, 2) + Math.pow(y0 - y1, 2));
});
distance.subscribe(x => console.log(x));

Test:

See:

public partition(predicate: function(value: T, index: number): boolean, thisArg: any): [Observable<T>, Observable<T>] source

将源 Observable 一分为二,一个是所有满足 predicate 函数的值,另一个是所有 不满足 predicate 的值。

它很像 filter,但是返回两个 Observables : 一个像 filter 的输出, 而另一个是所有不符合条件的值。

partition 输出有两个 Observables 的数组,这两个 Observables 是通过给定的 predicate 函数将源 Observable 的值进行划分得到的。该数组的第一个 Observable 发出 predicate 参数 返回 true 的源值。第二个 Observable 发出 predicate 参数返回 false 的源值。第一个像是 filter ,而第二个像是 predicate 取反的 filter

Params:

NameTypeAttributeDescription
predicate function(value: T, index: number): boolean

评估源 Observable 所发出的每个值的函数。如果它返回 true ,那么发出的值就在返回的数组中的第一个 Observable 中,如果返回的是 false ,那么发出的值就在返回的数组的第二个 Observable 中。index 参数是自订阅开始后发送序列的索引,是从 0 开始的。

thisArg any
  • optional

可选参数,用来决定 predicate 函数中的 this 的值。

Return:

[Observable<T>, Observable<T>]

有两个 Observables 的数组: 一个是通过 predicate 函数的所有值,另一个是没有通过 predicate 的所有值。

Example:

将点击事件划分为点击 DIV 元素和点击其他元素
var clicks = Rx.Observable.fromEvent(document, 'click');
var parts = clicks.partition(ev => ev.target.tagName === 'DIV');
var clicksOnDivs = parts[0];
var clicksElsewhere = parts[1];
clicksOnDivs.subscribe(x => console.log('DIV clicked: ', x));
clicksElsewhere.subscribe(x => console.log('Other clicked: ', x));

Test:

See:

public pluck(properties: ...string): Observable source

将每个源值(对象)映射成它指定的嵌套属性。

类似于 map,但仅用于选择每个发出对象的某个嵌套属性。

给定描述对象属性路径的字符串的列表,然后从源 Observable 中的所有值中检索指定嵌套 属性的值。如果属性无法解析,它会返回 undefined

Params:

NameTypeAttributeDescription
properties ...string

从每个源值(对象啊)中提取的嵌套属性。

Return:

Observable

全新的 Observable,它发出源自源值的属性值。

Example:

将每次点击映射成点击的 target 元素的 tagName 属性
var clicks = Rx.Observable.fromEvent(document, 'click');
var tagNames = clicks.pluck('target', 'tagName');
tagNames.subscribe(x => console.log(x));

Test:

See:

public publish(selector: Function): * source

返回 ConnectableObservable,它是 Observable 的变种,它会一直等待,直到 connnect 方法被调用才会开始把值发送给那些订阅它的观察者。

Params:

NameTypeAttributeDescription
selector Function
  • optional

可选的选择器函数,可以根据需要多次使用以多播源序列,而不会导致源序列 生成多个 subscriptions 。给定源的订阅者会从订阅开始的一刻起,接收源的所有通知。

Return:

*

ConnectableObservable,一旦连接,源 Observable 便会向它的观察者发出项。

Test:

public publishBehavior(value: *): ConnectableObservable<T> source

Params:

NameTypeAttributeDescription
value *

Test:

public publishReplay(bufferSize: *, windowTime: *, scheduler: *): ConnectableObservable<T> source

Params:

NameTypeAttributeDescription
bufferSize *
windowTime *
scheduler *

Test:

public race(): Observable source

返回 Observable,该 Observable 是源 Observable 和提供的 Observables 的组合中 第一个发出项的 Observable 的镜像。

Params:

NameTypeAttributeDescription
...observables ...Observables

用于竞争的 Observables 源,以比试哪个 Observable 会首先发出项。

Return:

Observable

该 Observable 是第一个发出项的 Observable 输出镜像。

Test:

public reduce(accumulator: function(acc: R, value: T, index: number): R, seed: R): Observable<R> source

在源 Observalbe 上应用 accumulator (累加器) 函数,然后当源 Observable 完成时,返回 累加的结果,可以提供一个可选的 seed 值。

使用 accumulator (累加器) 函数将源 Observable 所发出的所有值归并在一起, 该函数知道如何将新的源值纳入到过往的累加结果中。

类似于 Array.prototype.reduce()reduce 可以对累加值和源 Observable (过去的)的每个值应用 accumulator 函数, 然后将其归并成一个值并且在输出 Observable 上发出。注意,reduce 只会发出一个值, 并且是当源 Observable 完成时才发出。它等价于使用 scan 操作符后面再跟 last 操作符。

返回的 Observable 为由源 Observable 发出的每项应用指定的 accumulator 函数。 如果指定了 seed 值,那么这个值会作为 accumulator 函数的初始值。如果没有指定 seed 值,那么源中的第一项会作为 seed 来使用。

Params:

NameTypeAttributeDescription
accumulator function(acc: R, value: T, index: number): R

调用每个 源值的累加器函数。

seed R
  • optional

初始累加值。

Return:

Observable<R>

该 Observable 发出单个值,这个值是由源 Observable 发出值累加的结果。

Example:

计算5秒内发生的点击次数
var clicksInFiveSeconds = Rx.Observable.fromEvent(document, 'click')
  .takeUntil(Rx.Observable.interval(5000));
var ones = clicksInFiveSeconds.mapTo(1);
var seed = 0;
var count = ones.reduce((acc, one) => acc + one, seed);
count.subscribe(x => console.log(x));

Test:

See:

public repeat(count: number): Observable source

返回的 Observable 重复由源 Observable 所发出的项的流,最多可以重复 count 次。

Params:

NameTypeAttributeDescription
count number
  • optional

源 Observable 项重复的次数,如果 count 为0则产生一个空的 Observable 。

Return:

Observable

该 Observable 重复由源 Observable 所发出的项的流,最多可以重复 count 次。

Test:

public repeatWhen(notifier: function(notifications: Observable): Observable): Observable source

返回的 Observalbe 是源 Observable 的镜像,除了 complete 。如果源 Observable 调用了 complete,这个方法会发出给 notifier 返回的 Observable 。如果这个 Observale 调用了 completeerror,那么这个方法会在子 subscription 上调用 completeerror 。否则,此方法将重新订阅源 Observable。

Params:

NameTypeAttributeDescription
notifier function(notifications: Observable): Observable

接收 Observable 的通知,用户可以该通知 的 completeerror 来中止重复。

Return:

Observable

使用重复逻辑修改过的源 Observable 。

Test:

public retry(count: number): Observable source

返回一个 Observable, 该 Observable 是源 Observable 不包含错误异常的镜像。 如果源 Observable 发生错误, 这个方法不会传播错误而是会不 断的重新订阅源 Observable 直到达到最大重试次数 (由数字参数指定)。

任何所有被源 Observable 发出的数据项都会被做为结果的 Observable 发出, 即使这些发送是在失败的订阅期间。 举个例子, 如果一个 Observable 第一次发送[1, 2]后失败了,紧接着第二次成功的发出: [1, 2, 3, 4, 5]后触发完成, 最后发送流和通知为: [1, 2, 1, 2, 3, 4, 5, complete]。

Params:

NameTypeAttributeDescription
count number

在失败之前重试的次数。

Return:

Observable

使用重试逻辑修改过的源 Observable。

Test:

public retryWhen(notifier: function(errors: Observable): Observable): Observable source

返回一个 Observable, 该 Observable 是源 Observable 不包含错误异常的镜像。 如果源头 Observable 触发 error, 这个方法会发出引起错误的 Throwable 给 notifier 返回的 Observable。 如果该 Observable 触发 complete 或者 error 则该方法会使子订阅触发 completeerror。 否则该方法会重新订阅源 Observable。

Params:

NameTypeAttributeDescription
notifier function(errors: Observable): Observable

接受一个用户可以complete 或者 error的通知型 Observable, 终止重试。

Return:

Observable

使用重试逻辑修改过的源 Observable。

Test:

public sample(notifier: Observable<any>): Observable<T> source

发出源 Observable 最新发出的值当另一个 notifier Observable发送时。

就像是 sampleTime, 但是无论何时notifier Observable 进行了发送都会去取样。

无论何时 notifier Observable 发出一个值或者完成, sample 会去源 Observable 中发送上次 取样后源 Observable 发出的最新值, 除非源在上一次取样后没有发出值。 notifier会被订阅只要输出 Observable 被订阅。

Params:

NameTypeAttributeDescription
notifier Observable<any>

被用来取样的源 Observable。

Return:

Observable<T>

Observable,该 Observable 发出当通知 Observable 发出值或者完成时从源 Observable 取样的最新值。

Example:

每次点击, 取样最新的 "seconds" 时间器
var seconds = Rx.Observable.interval(1000);
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = seconds.sample(clicks);
result.subscribe(x => console.log(x));

Test:

See:

public sampleTime(period: number, scheduler: Scheduler): Observable<T> source

在周期时间间隔内发出源 Observable 发出的最新值。

在周期时间间隔内取样源 Observable , 发出取样的。

sampleTime 周期性的查看源 Observable 并且发出上次取样后发出的最新的值, 除非上次取样后 就没有再发出数据了。 取样在每个周期毫秒(或者时间单位由可选的调度器参数决定)内定期发生。 只要 输出 Observable 被订阅取样就开始。

Params:

NameTypeAttributeDescription
period number

用毫秒或者由可选的调度器参数决定的时间单位表示的取样周期。

scheduler Scheduler
  • optional
  • default: async

IScheduler用来管理取样的时间。

Return:

Observable<T>

Observable,该 Observable 发出特定的时间周期从源 Observable 取样的最新值。

Example:

每秒, 发出最近的一个点击
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.sampleTime(1000);
result.subscribe(x => console.log(x));

Test:

See:

public scan(accumulator: function(acc: R, value: T, index: number): R, seed: T | R): Observable<R> source

对源 Observable 使用累加器函数, 返回生成的中间值, 可选的初始值。

就想是 reduce, 但是发出目前的累计数当源发出数据的时候。

将所有源发出的数据结合起来, 使用一个累加器函数,该函数知道如何将新的值加入到累加器中。 这就像是reduce, 但是会发出中间的累加值。

返回一个 Observable, 该 Observable 对每个源 Observable 发出的值使用特定的累加器。 如果seed值提供了, 这个值会被累加器用作初始值。 如果seed值没有被提供, 源数据的第一项会被当做初始值。

Params:

NameTypeAttributeDescription
accumulator function(acc: R, value: T, index: number): R

对每个源数据调用的累加器函数。

seed T | R
  • optional

初始值。

Return:

Observable<R>

带有累加功能的observable。

Example:

计数点击次数
var clicks = Rx.Observable.fromEvent(document, 'click');
var ones = clicks.mapTo(1);
var seed = 0;
var count = ones.scan((acc, one) => acc + one, seed);
count.subscribe(x => console.log(x));

Test:

See:

public sequenceEqual(compareTo: Observable, comparor: function): Observable source

使用可选的比较函数,按顺序比较两个 Observables 的所有值,然后返回单个布尔值的 Observable, 以表示两个序列是否相等。 按顺序检查两个 Observables 所发出的所有值是否相等。

sequenceEqual 订阅两个 observables 并且缓冲每个 observable 发出的值。 当任何一个 observable 发出数据, 该值会被缓冲 并且缓冲区从底部向上移动和比较; 如果任何一对值不匹配, 返回的 observable 会发出 false 和完成。 如果其中一个 observables 完 成了, 操作符会等待另一个 observable 完成; 如果另一个 observable 在完成之前又发出了数据, 返回 observable 会发出 false 和完 成。 如果其中一个 observable 永远不会完成或者在另一个完成后还发出数据, 返回的 observable 永远不会结束。

Params:

NameTypeAttributeDescription
compareTo Observable

用来与源 Observable 进行比较的 Observable 序列。

comparor function
  • optional

用来比较每一对值的比较函数。

Return:

Observable

该 Observable 发出单个布尔值,该布尔值表示两个 Observables 所发出的值是否依次相等。

Example:

指出 Konami 码是否匹配
var code = Rx.Observable.from([
 "ArrowUp",
 "ArrowUp",
 "ArrowDown",
 "ArrowDown",
 "ArrowLeft",
 "ArrowRight",
 "ArrowLeft",
 "ArrowRight",
 "KeyB",
 "KeyA",
 "Enter" // no start key, clearly.
]);

var keys = Rx.Observable.fromEvent(document, 'keyup')
 .map(e => e.code);
var matches = keys.bufferCount(11, 1)
 .mergeMap(
   last11 =>
     Rx.Observable.from(last11)
       .sequenceEqual(code)
  );
matches.subscribe(matched => console.log('Successful cheat at Contra? ', matched));

Test:

See:

public share(): Observable<T> source

返回一个新的 Observable,该 Observable 多播(共享)源 Observable。 至少要有一个订阅者,该 Observable 才会被订阅并发出数据。 当所有的订阅者都取消订阅了,它会取消对源 Observable 的订阅。 因为 Observable 是多路传播的它使得流是 hot。 它是 `.publish().refCount()` 的别名。

Return:

Observable<T>

Observable,连接该 Observable 后会导致源 Observable 向它的观察者发送数据。

Test:

public shareReplay(bufferSize: *, windowTime: *, scheduler: *): * source

Params:

NameTypeAttributeDescription
bufferSize *
windowTime *
scheduler *

Return:

*

Test:

public single(predicate: Function): Observable<T> source

该 Observable 发出源 Observable 所发出的值中匹配指定 predicate 函数的单个项。 如果源 Observable 发出多于1个数据项或者没有发出数据项, 分别以 IllegalArgumentException 和 NoSuchElementException 进行通知。

Params:

NameTypeAttributeDescription
predicate Function

断言函数,用来评估源 Observable 的数据项。

Return:

Observable<T>

该 Observable 发出源 Observable 所发出的值中匹配指定 predicate 函数的单个项。 .

Throw:

EmptyError

如果 Observable 在发送任何 next 通知之前完成的话,则发送 EmptyError 给观察者的 error 回调函数。

Test:

public skip(count: Number): Observable source

返回一个 Observable, 该 Observable 跳过源 Observable 发出的前N个值(N = count)。

Params:

NameTypeAttributeDescription
count Number

由源 Observable 所发出项应该被跳过的次数。

Return:

Observable

跳过源 Observable 发出值的 Observable。

Test:

public skipLast(count: number): Observable<T> source

跳过源 Observable 最后发出的的N个值 (N = count)。

skipLast 返回一个 Observable,该 Observable 累积足够长的队列以存储最初的N个值 (N = count)。 当接收到更多值时,将从队列的前面取值并在结果序列上产生。 这种情况下值会被延时。

Params:

NameTypeAttributeDescription
count number

源 Observable 中从后往前要跳过的值的数量。

Return:

Observable<T>

Observable 跳过源 Observable 发出 的最后几个值。

Throw:

ArgumentOutOfRangeError

当使用 skipLast(i) 时, 如果i < 0,则 抛出 ArgumentOutOrRangeError。

Example:

跳过有多个值的 Observable 的最后2个值
var many = Rx.Observable.range(1, 5);
var skipLastTwo = many.skipLast(2);
skipLastTwo.subscribe(x => console.log(x));

// Results in:
// 1 2 3

See:

public skipUntil(notifier: Observable): Observable<T> source

返回一个 Observable,该 Observable 会跳过源 Observable 发出的值直到第二个 Observable 开始发送。

Params:

NameTypeAttributeDescription
notifier Observable

第二个 Observable ,它发出后,结果 Observable 开始镜像源 Observable 的元素。

Return:

Observable<T>

Observable 跳过源 Observable 发出的值直到第二个 Observable 开始发送, 然后发出剩下的数据项。

Test:

public skipWhile(predicate: Function): Observable<T> source

返回一个 Observable, 该 Observable 会跳过由源 Observable 发出的所有满足指定条件的数据项, 但是一旦出现了不满足条件的项,则发出在此之后的所有项。

Params:

NameTypeAttributeDescription
predicate Function

函数,用来测试源 Observable 发出的每个数据项。

Return:

Observable<T>

Observable,当指定的 predicate 函数返回 false 时,该 Observable 开始发出由源 Observable 发出的项。

Test:

public startWith(values: ...T, scheduler: Scheduler): Observable source

返回的 Observable 会先发出作为参数指定的项,然后再发出由源 Observable 所发出的项。

Params:

NameTypeAttributeDescription
values ...T

你希望修改过的 Observable 可以先发出的项。

scheduler Scheduler
  • optional

用于调度 next 通知发送的 IScheduler

Return:

Observable

该 Observable 发出指定的 Iterable 中的项,然后发出由源 Observable 所发出的项。

Test:

public subscribe(observerOrNext: Observer | Function, error: Function, complete: Function): ISubscription source

调用 Observable 的执行并注册 Observer 的处理器以便于发出通知。

当你拥有这些 Observables 却仍然什么也没发生时使用它。

subscribe 不是一个常规的操作符,而是方法,它调用 Observables 内部的 subscribe 函数。它也许是一个你传递给 create 静态工厂的方法,但是大多数情况下它是一个库的实现,它定义了 Observable 什么时候发出,发出什么。这意味着实际上是 Observable 开始工 作的那一刻才调用 subscribe , 而不是像人们经常认为的那样,即创建 Observable 的时候。

除了开始 Observable 的执行,该方法允许你监听 Observable 发出的值,也包括完成或者发生错误。你可以通过以下两种方式达到 这种目的。

第一种方式是创建一个实现了 Observer 接口的对象。它应该实现接口定义的方法,但是要注意的是它仅仅是一个普通的 JavaScript 对象,你可以用任何你想要的方式创建(ES6 class, 常见的构造函数, 对象字面量等等)。 特别地,不要尝试使用任何 RxJS 的实现细节去创建 Observers-你不需要这样做。 同样要记住,你的对象不需要实现所有的方法。如果你发现自己创建一个不做任何事情的方法,你可以简化它。 不过要注意,如果 error 方法没用被提供,所有的错误都不会被捕获。

第二种方式是放弃 Observer 对象,只需提供回调函数来替代它的方法。这意味你可以给 subscribe 提供3个方法作为参数, 第一个回调 等价于 next 方法,第二个等价于 error 方法,第三个等价于 complete 方法。就如同 Observer 一样,如果你不需要监听某中某个,你可以 省略该函数,通过传递 undefined 或者 null,因为 subscribe 可以通过在函数调用中的位置识别了这些函数。提到 error 函数,正如上文所述, 如果没有提供,Observable 发出的错误会被抛弃。

不管你使用了哪种方式调用 subscribe,所有情况都返回 Subscription 对象。该对象允许你调用 unsubscribe,该方法会停止 Observable 的工作并且清理 Observable 持有的资源。注意,取消订阅不会调用 subscribe 提供的 complete 回调函数,complete 回调函数是为来自 Observable 的常规完成信号保留的。

记住,提供给 subscribe 的回调函数无法保证是被异步地调用。是 Observable 自身决定这些方法的执行。例如:of 默认同步地发出所有的值。经常查看文档以确认给定的 Observable 被订阅时的行为是怎样的,以及它的默认行为是否可以通过使用 Scheduler 进行更改。

Params:

NameTypeAttributeDescription
observerOrNext Observer | Function

[可选] 或者是 observer 对象, 或者是1个到3个处理器,处理已订阅的 Observable 发出的值。

error Function

[可选] 由错误导致的终结事件的处理器。如果没有提供处理器,错误将不做处理直接抛弃。

complete Function

[可选] 由成功完成导致的终结事件的处理器。

Return:

ISubscription

注册处理程序的订阅引用。

Example:

用 Observer 对象订阅
const sumObserver = {
  sum: 0,
  next(value) {
    console.log('Adding: ' + value);
    this.sum = this.sum + value;
  },
  error() { // We actually could just remote this method,
  },        // since we do not really care about errors right now.
  complete() {
    console.log('Sum equals: ' + this.sum);
  }
};

Rx.Observable.of(1, 2, 3) // 同步发出 1, 2, 3,然后完成。
.subscribe(sumObserver);

// 日志:
// "Adding: 1"
// "Adding: 2"
// "Adding: 3"
// "Sum equals: 6"
用函数订阅
let sum = 0;

Rx.Observable.of(1, 2, 3)
.subscribe(
  function(value) {
    console.log('Adding: ' + value);
    sum = sum + value;
  },
  undefined,
  function() {
    console.log('Sum equals: ' + sum);
  }
);

// 日志:
// "Adding: 1"
// "Adding: 2"
// "Adding: 3"
// "Sum equals: 6"
取消订阅
const subscription = Rx.Observable.interval(1000).subscribe(
  num => console.log(num),
  undefined,
  () => console.log('completed!') // 即使当取消订阅时,也不会被调用
);


setTimeout(() => {
  subscription.unsubscribe();
  console.log('unsubscribed!');
}, 2500);

// Logs:
// 0 after 1s
// 1 after 2s
// "unsubscribed!" after 2,5s

public subscribeOn(scheduler: Scheduler): Observable<T> source

使用指定的 IScheduler 异步地订阅此 Observable 的观察者。

Params:

NameTypeAttributeDescription
scheduler Scheduler

执行 subscription 操作的 IScheduler 。

Return:

Observable<T>

修改过的源 Observable 以便它的 subscriptions 发生在指定的 IScheduler 上。 .

Test:

public switch(): Observable<T> source

通过只订阅最新发出的内部 Observable ,将高阶 Observable 转换成一阶 Observable 。

一旦有新的内部 Observable 出现,通过丢弃前一个,将 高级 Observable 打平。

switch 订阅发出 Observables 的 Observable,也就是高阶 Observable 。 每次观察到这些已发出的内部 Observables 中的其中一个时,输出 Observable 订阅 这个内部 Observable 并开始发出该 Observable 所发出的项。到目前为止, 它的行为就像 mergeAll 。然而,当发出一个新的内部 Observable 时, switch 会从先前发送的内部 Observable 那取消订阅,然后订阅新的内部 Observable 并开始发出它的值。后续的内部 Observables 也是如此。

Return:

Observable<T>

该 Observable 发出由源 Observable 最新发出的 Observable 所发出的项。

Example:

每次点击返回一个 interval Observable
var clicks = Rx.Observable.fromEvent(document, 'click');
// 每次点击事件都会映射成间隔1秒的 interval Observable
var higherOrder = clicks.map((ev) => Rx.Observable.interval(1000));
var switched = higherOrder.switch();
// 结果是 `switched` 本质上是一个每次点击时会重新启动的计时器。
// 之前点击产生的 interval Observables 不会与当前的合并。
switched.subscribe(x => console.log(x));

Test:

See:

public switchMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable source

将每个源值投射成 Observable,该 Observable 会合并到输出 Observable 中, 并且只发出最新投射的 Observable 中的值。

将每个值映射成 Observable ,然后使用 switch 打平所有的内部 Observables 。

返回的 Observable 基于应用一个函数来发送项,该函数提供给源 Observable 发出的每个项, 并返回一个(所谓的“内部”) Observable 。每次观察到这些内部 Observables 的其中一个时, 输出 Observable 将开始发出该内部 Observable 所发出的项。当发出一个新的内部 Observable 时,switchMap 会停止发出先前发出的内部 Observable 并开始发出新的内部 Observable 的值。后续的内部 Observables 也是如此。

Params:

NameTypeAttributeDescription
project function(value: T, ?index: number): ObservableInput

函数, 当应用于源 Observable 发出的项时,返回一个 Observable 。

resultSelector function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any
  • optional

函数,它用于产生基于值的输出 Observable 和源(外部)发送和内部 Observable 发送的索引。 传递给这个函数参数有:

  • outerValue: 来自源的值
  • innerValue: 来自投射的 Observable 的值
  • outerIndex: 来自源的值的 "index"
  • innerIndex: 来自投射的 Observable 的值的 "index"

Return:

Observable

该 Observable 发出由源 Observable 发出的每项应用投射函数 (和可选的 resultSelector)后的结果,并只接收最新投射的内部 Observable 的值。

Example:

每次点击返回一个 interval Observable
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.switchMap((ev) => Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));

Test:

See:

public switchMapTo(innerObservable: ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable source

将每个源值投射成同一个 Observable ,该 Observable 会使用 switch 多次被打平 到输出 Observable 中。

它很像 switchMap,但永远将每个值映射到同一个内部

    • Observable 。

将每个源值映射成给定的 Observable :innerObservable ,而无论源值是什么,然后 将这些结果 Observables 合并到单个的 Observable ,也就是输出 Observable 。 输出 Observables 只会发出 innerObservable 实例最新发出的值。

Params:

NameTypeAttributeDescription
innerObservable ObservableInput

用来替换源 Observable 中的每个值 的 Observable 。

resultSelector function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any
  • optional

函数,它用于产生基于值的输出 Observable 和源(外部)发送和内部 Observable 发送的索引。 传递给这个函数参数有:

  • outerValue: 来自源的值
  • innerValue: 来自投射的 Observable 的值
  • outerIndex: 来自源的值的 "index"
  • innerIndex: 来自投射的 Observable 的值的 "index"

Return:

Observable

每次源 Observable 发出值时,该 Observable 发出来自 给定 innerObservable (和通过 resultSelector 的可选的转换)的项, 并只接收最新投射的内部 Observable 的值。

Example:

每次点击返回一个 interval Observable
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.switchMapTo(Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));

Test:

See:

public take(count: number): Observable<T> source

只发出源 Observable 最初发出的的N个值 (N = count)。

接收源 Observable 最初的N个值 (N = count),然后完成。

take 返回的 Observable 只发出源 Observable 最初发出的的N个值 (N = count)。 如果源发出值的数量小于 count 的话,那么它的所有值都将发出。然后它便完成,无论源 Observable 是否完成。

Params:

NameTypeAttributeDescription
count number

发出 next 通知的最大次数。

Return:

Observable<T>

该 Observable 只发出源 Observable 最初发出的的N个值 (N = count), 或者发出源 Observable 的所有值,如果源发出值的数量小于 count 的话。

Throw:

ArgumentOutOfRangeError

当使用 take(i) 时,如果 i < 0, 它会发送 ArgumentOutOrRangeError 给观察者的 error 回调函数。

Example:

获取时间间隔为1秒的 interval Observable 的最初的5秒
var interval = Rx.Observable.interval(1000);
var five = interval.take(5);
five.subscribe(x => console.log(x));

Test:

See:

public takeLast(count: number): Observable<T> source

只发出源 Observable 最后发出的的N个值 (N = count)。

记住源 Observable 的最后N个值 (N = count),然后只有当 它完成时发出这些值。

takeLast 返回的 Observable 只发出源 Observable 最后发出的的N个值 (N = count)。 如果源发出值的数量小于 count 的话,那么它的所有值都将发出。此操作符必须等待 源 Observable 的 complete 通知发送才能在输出 Observable 上发出 next 值, 因为不这样的话它无法知道源 Observable 上是否还有更多值要发出。出于这个原因, 所有值都将同步发出,然后是 complete 通知。

Params:

NameTypeAttributeDescription
count number

从源 Observable 的值序列的末尾处,要发出的值的最大数量。

Return:

Observable<T>

该 Observable 只发出源 Observable 最后发出的的N个值 (N = count), 或者发出源 Observable 的所有值,如果源发出值的数量小于 count 的话。

Throw:

ArgumentOutOfRangeError

当使用 takeLast(i) 时,如果 i < 0, 它会发送 ArgumentOutOrRangeError 给观察者的 error 回调函数。

Example:

获取有多个值的 Observable 的最后3个值
var many = Rx.Observable.range(1, 100);
var lastThree = many.takeLast(3);
lastThree.subscribe(x => console.log(x));

Test:

See:

public takeUntil(notifier: Observable): Observable<T> source

发出源 Observable 发出的值,直到 notifier Observable 发出值。

它发出源 Observable 的值,然后直到第二个 Observable (即 notifier )发出项,它便完成。

takeUntil 订阅并开始镜像源 Observable 。它还监视另外一个 Observable,即你 提供的 notifier 。如果 notifier 发出值或 complete 通知,那么输出 Observable 停止镜像源 Observable ,然后完成。

Params:

NameTypeAttributeDescription
notifier Observable

该 Observable 第一次发出值会使 takeUntil 的 输出 Observable 停止发出由源 Observable 所发出的值。

Return:

Observable<T>

该 Observable 发出源 Observable 所发出的值,直到某个 时间点 notifier 发出它的第一个值。

Example:

每秒都发出值,直到第一次点击发生
var interval = Rx.Observable.interval(1000);
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = interval.takeUntil(clicks);
result.subscribe(x => console.log(x));

Test:

See:

public takeWhile(predicate: function(value: T, index: number): boolean): Observable<T> source

发出在源 Observable 中满足 predicate 函数的每个值,并且一旦出现不满足 predicate 的值就立即完成。

只要当通过给定的条件时才接收源 Observable 的值。 当第一个不满足条件的值出现时,它便完成。

takeWhile 订阅并开始镜像源 Observable 。每个源 Observable 发出的值都会传给 predicate 函数,它会返回源值是否满足条件的布尔值。输出 Observable 会发出源值, 直到某个时间点 predicate 返回了 false,此时 takeWhile 会停止镜像源 Observable 并且完成输出 Observable 。

Params:

NameTypeAttributeDescription
predicate function(value: T, index: number): boolean

评估源 Observable 所发出值的函数并返回布尔值。还接收 index(从0开始) 作为第二个参数。

Return:

Observable<T>

只要每个值满足 predicate 函数所定义的条件,那么该 Observable 就会从源 Observable 中发出值,然后完成。

Example:

只有当 clientX 属性大于200时才发出点击事件
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.takeWhile(ev => ev.clientX > 200);
result.subscribe(x => console.log(x));

Test:

See:

public throttle(durationSelector: function(value: T): SubscribableOrPromise, config: Object): Observable<T> source

从源 Observable 中发出一个值,然后在由另一个 Observable 决定的期间内忽略 随后发出的源值,然后重复此过程。

它很像 throttleTime,但是沉默持续时间是由 第二个 Observable 决定的。

throttle 的内部定时器禁用时,它会在输出 Observable 上发出源 Observable 的值, 并当定时器启用时忽略源值。最开始时,定时器是禁用的。一旦第一个源值达到,它会被转发 到输出 Observable ,然后通过使用源值调用 durationSelector 函数来启动定时器,这 个函数返回 "duration" Observable 。当 duration Observable 发出值或完成时,定时器 会被禁用,并且下一个源值也是重复此过程。

Params:

NameTypeAttributeDescription
durationSelector function(value: T): SubscribableOrPromise

该函数 从源 Observable 中接收值,用于为每个源值计算沉默持续时间,并返回 Observable 或 Promise 。

config Object

用来定义 leadingtrailing 行为的配置对象。 默认为 { leading: true, trailing: false }

Return:

Observable<T>

该 Observable 执行节流操作,以限制源 Observable 的 发送频率。

Example:

以每秒最多点击一次的频率发出点击事件
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.throttle(ev => Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));

Test:

See:

public throttleTime(duration: number, scheduler: Scheduler): Observable<T> source

从源 Observable 中发出一个值,然后在 duration 毫秒内忽略随后发出的源值, 然后重复此过程。

让一个值通过,然后在接下来的 duration 毫秒内忽略源值。

throttle 的内部定时器禁用时,它会在输出 Observable 上发出源 Observable 的值, 并当定时器启用时忽略源值。最开始时,定时器是禁用的。一旦第一个源值达到,它会被转发 到输出 Observable ,然后启动定时器。在 duration 毫秒(或由可选的 scheduler 内部确定的时间单位)后,定时器会被禁用,并且下一个源值也是重复此过程。可选择性地 接收一个 IScheduler 用来管理定时器。

Params:

NameTypeAttributeDescription
duration number

在发出一个最新的值后,到再发出另外一个值之间的等待时间, 以毫秒为单位或以可选的 scheduler 内部决定的时间单位来衡量。

scheduler Scheduler
  • optional
  • default: async

调度器( IScheduler ),用来 管理处理节流的定时器。

Return:

Observable<T>

该 Observable 执行节流操作,以限制源 Observable 的 发送频率。

Example:

以每秒最多点击一次的频率发出点击事件
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.throttleTime(1000);
result.subscribe(x => console.log(x));

Test:

See:

public timeInterval(scheduler: *): Observable<TimeInterval<any>> | WebSocketSubject<T> | Observable<T> source

Params:

NameTypeAttributeDescription
scheduler *

Return:

Observable<TimeInterval<any>> | WebSocketSubject<T> | Observable<T>

Test:

public timeout(due: number, scheduler: Scheduler): Observable<R> | WebSocketSubject<T> | Observable<T> source

Params:

NameTypeAttributeDescription
due number
scheduler Scheduler
  • optional

Return:

Observable<R> | WebSocketSubject<T> | Observable<T>

public timeoutWith(due: *, withObservable: *, scheduler: *): Observable<R> | WebSocketSubject<T> | Observable<T> source

Params:

NameTypeAttributeDescription
due *
withObservable *
scheduler *

Return:

Observable<R> | WebSocketSubject<T> | Observable<T>

Test:

public timestamp(scheduler: *): Observable<Timestamp<any>> | WebSocketSubject<T> | Observable<T> source

Params:

NameTypeAttributeDescription
scheduler *

Return:

Observable<Timestamp<any>> | WebSocketSubject<T> | Observable<T>

Test:

public toArray(): Observable<any[]> | WebSocketSubject<T> | Observable<T> source

Return:

Observable<any[]> | WebSocketSubject<T> | Observable<T>

Test:

public toPromise(PromiseCtor: PromiseConstructor): Promise<T> source

将 Observable 序列转换为符合 ES2015 标准的 Promise 。

Params:

NameTypeAttributeDescription
PromiseCtor PromiseConstructor
  • optional

Promise 的构造函数。如果没有提供的话, 它首先会在 Rx.config.Promise 中寻找构造函数,然后会回退成原生的 Promise 构造函数 (如果有的话)。

Return:

Promise<T>

符合 ES2015 标准的 Promise,它使用 Observable 序列的最后一个值。

Example:

// 使用普通的 ES2015
let source = Rx.Observable
  .of(42)
  .toPromise();

source.then((value) => console.log('Value: %s', value));
// => Value: 42

// 被拒的 Promise
// 使用标准的 ES2015
let source = Rx.Observable
  .throw(new Error('woops'))
  .toPromise();

source
  .then((value) => console.log('Value: %s', value))
  .catch((err) => console.log('Error: %s', err));
// => Error: Error: woops

// 通过 config 进行设置
Rx.config.Promise = RSVP.Promise;

let source = Rx.Observable
  .of(42)
  .toPromise();

source.then((value) => console.log('Value: %s', value));
// => Value: 42

// 通过方法进行设置
let source = Rx.Observable
  .of(42)
  .toPromise(RSVP.Promise);

source.then((value) => console.log('Value: %s', value));
// => Value: 42

Test:

public window(windowBoundaries: Observable<any>): Observable<Observable<T>> source

每当 windowBoundaries 发出项时,将源 Observable 的值分支成嵌套的 Observable 。

就像是 buffer, 但发出的是嵌套的 Observable ,而不是数组。

返回的 Observable 发出从源 Observable 收集到的项的窗口。 输出 Observable 发出连接的,不重叠的 窗口. 当windowBoundaries Observable 开始发出数据,它会发出目前的窗口并且会打开一个新的。 因为每个窗口都是 Observable, 所以输出 Observable 是高阶 Observable。

Params:

NameTypeAttributeDescription
windowBoundaries Observable<any>

完成上一个窗口并且开启新窗口的 Observable。

Return:

Observable<Observable<T>>

每个窗口都是一个 Observable,它发出源 Observable 所发出的值。

Example:

在每个窗口(窗口间的时间间隔为1秒)中,最多发出两次点击事件
var clicks = Rx.Observable.fromEvent(document, 'click');
var interval = Rx.Observable.interval(1000);
var result = clicks.window(interval)
  .map(win => win.take(2)) // 每个窗口最多两个发送
  .mergeAll(); // 打平高阶 Observable
result.subscribe(x => console.log(x));

Test:

See:

public windowCount(windowSize: number, startWindowEvery: number): Observable<Observable<T>> source

将源 Observable 的值分支成多个嵌套的 Observable ,每个嵌套的 Observable 最多发出 windowSize 个值。

就像是 bufferCount, 但是返回嵌套的 Observable 而不是数组。

返回的 Observable 发出从源 Observable 收集到的项的窗口。 输出 Observable 每M(M = startWindowEvery)个项发出新窗口,每个窗口包含的项数不得超过N个(N = windowSize)。 当源 Observable 完成或者遇到错误,输出 Observable 发出当前窗口并且传播从源 Observable 收到的通知。 如果没有提供 startWindowEvery ,那么在源 Observable 的起始处立即开启新窗口,并且当每个窗口的大小达到 windowSize 时完成。

Params:

NameTypeAttributeDescription
windowSize number

每个窗口最多可以发送的个数。

startWindowEvery number
  • optional

开启新窗口的间隔。 比如,如果 startWindowEvery2, 新窗口会在源中每个第二个值开启。默认情况下,新窗口是在源 Observable 的起始处开启的。

Return:

Observable<Observable<T>>

窗口的 Observable,每个窗口又是值的 Observable 。(译者注:其实就是高阶 Observable )

Example:

从第一个点击事件开始,忽略第3N次点击
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.windowCount(3)
  .map(win => win.skip(1)) // 跳过每三个点击中的第一个
  .mergeAll(); // 打平高阶 Observable
result.subscribe(x => console.log(x));
从第三个点击事件开始,忽略第3N次点击
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.windowCount(2, 3)
  .mergeAll(); // 打平高阶 Observable
result.subscribe(x => console.log(x));

Test:

See:

public windowToggle(openings: Observable<O>, closingSelector: function(value: O): Observable): Observable<Observable<T>> source

将源 Observable 的值分支成嵌套的 Observable,分支策略是以 openings 发出项为起始,以 closingSelector 发出为结束。

就像是 bufferToggle, 但是发出的是嵌套 Observable 而不是数组。

返回的 Observable 发出从源 Observable 收集到的项的窗口。输出 Observable 发出窗口 ,每一个窗口 包括当 openings 发出时开始收集源 Observable 的数据项并且 closingSelector 返回的 Observable 发出项时结束收集。

Params:

NameTypeAttributeDescription
openings Observable<O>

通知开启新窗口的 observable。

closingSelector function(value: O): Observable

是一个接受openings observable 发出的值作为参数,并且返回 Observable 的函数, 当该 observable 发出 next 或者 complete时,会发信号给相关的窗口以通知它们应该完成。

Return:

Observable<Observable<T>>

窗口的 Observable,每个窗口又是值的 Observable 。(译者注:其实就是高阶 Observable )

Example:

每隔一秒钟, 发出接下来 500ms 的点击事件。
var clicks = Rx.Observable.fromEvent(document, 'click');
var openings = Rx.Observable.interval(1000);
var result = clicks.windowToggle(openings, i =>
  i % 2 ? Rx.Observable.interval(500) : Rx.Observable.empty()
).mergeAll();
result.subscribe(x => console.log(x));

Test:

See:

public windowWhen(closingSelector: function(): Observable): Observable<Observable<T>> source

将源 Observable 的值分支成嵌套的 Observable ,通过使用关闭 Observable 的工厂函数来决定何时开启新的窗口。

就像是 bufferWhen, 但是发出的是嵌套的 Observable 而不是数组。

返回的 Observable 发出从源 Observable 收集到的项的窗口。 输出 Observable 发出连接的,非重叠的窗口。 每当由指定的 closingSelector 函数产生的 Observable 发出项,它会发出当前窗口并开启一个新窗口。 当输出 Observable 被订阅的时候立马开启第一个窗口。

Params:

NameTypeAttributeDescription
closingSelector function(): Observable

函数,不接受参数并且返回 Observable, 该 Observable 发出信号(next 或者 complete)以决定何时关闭前一个窗口,开启新一个窗口。

Return:

Observable<Observable<T>>

窗口的 Observable,每个窗口又是值的 Observable(译者注:其实就是高阶 Observable )。

Example:

在每个秒速随机(1-5秒)的窗口中,只发出最开始的两次点击事件
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks
  .windowWhen(() => Rx.Observable.interval(1000 + Math.random() * 4000))
  .map(win => win.take(2)) // 每个窗口最多两个发送
  .mergeAll(); // 打平高阶Observable
result.subscribe(x => console.log(x));

Test:

See:

public withLatestFrom(other: ObservableInput, project: Function): Observable source

结合源 Observable 和另外的 Observables 以创建新的 Observable, 该 Observable 的值由每 个 Observable 最新的值计算得出,当且仅当源发出的时候。

每当源 Observable 发出值,它会计算一个公式,此公式使用该值加上其他输入 Observable 的最新值,然后发出公式的输出结果。

withLatestFrom 结合源 Observablecombines(实例)和其他输入 Observables 的最新值,当且仅当 source 发出数据时, 可选的使用 project 函数以决定输出 Observable 将要发出的值。 在输出 Observable 发出值之前,所有的输入 Observables 都必须发出至少一个值。

Params:

NameTypeAttributeDescription
other ObservableInput

输入 Observable ,用来和源 Observable 结合。 可以传入多个输入 Observables。

project Function
  • optional

将多个值合并的投射函数。顺序地接受所有 Observables 传入的值,第一个参数是源 Observable 的值。 (a.withLatestFrom(b, c, (a1, b1, c1) => a1 + b1 + c1))。 如果没有传入, 输入 Observable 会一直发送数组。

Return:

Observable

该 Observable 为一个拥有将每个输入 Observable 最新的值投射后的值, 或者一个包含所有输入 Observable 的最新值的数组。

Example:

对于每个点击事件,发出一个包含最新时间和点击事件的数组。
var clicks = Rx.Observable.fromEvent(document, 'click');
var timer = Rx.Observable.interval(1000);
var result = clicks.withLatestFrom(timer);
result.subscribe(x => console.log(x));

Test:

See:

public zipAll(project: *): Observable<R> | WebSocketSubject<T> | Observable<T> source

Params:

NameTypeAttributeDescription
project *

Return:

Observable<R> | WebSocketSubject<T> | Observable<T>

Test:

public zipProto(observables: *): Observable<R> source

Params:

NameTypeAttributeDescription
observables *

Return:

Observable<R>