1 import { Observable } from '../Observable';
2 import { OperatorFunction } from '../types';
3 import { Subject } from '../Subject';
4 import { Subscriber } from '../Subscriber';
5 import { OuterSubscriber } from '../OuterSubscriber';
6 import { InnerSubscriber } from '../InnerSubscriber';
7 import { subscribeToResult } from '../util/subscribeToResult';
8 import { Operator } from '../Operator';
11 * Branch out the source Observable values as a nested Observable whenever
12 * `windowBoundaries` emits.
14 * <span class="informal">It's like {@link buffer}, but emits a nested Observable
15 * instead of an array.</span>
19 * Returns an Observable that emits windows of items it collects from the source
20 * Observable. The output Observable emits connected, non-overlapping
21 * windows. It emits the current window and opens a new one whenever the
22 * Observable `windowBoundaries` emits an item. Because each window is an
23 * Observable, the output is a higher-order Observable.
26 * In every window of 1 second each, emit at most 2 click events
28 * import { fromEvent, interval } from 'rxjs';
29 * import { window, mergeAll, map, take } from 'rxjs/operators';
31 * const clicks = fromEvent(document, 'click');
32 * const sec = interval(1000);
33 * const result = clicks.pipe(
35 * map(win => win.pipe(take(2))), // each window has at most 2 emissions
36 * mergeAll(), // flatten the Observable-of-Observables
38 * result.subscribe(x => console.log(x));
40 * @see {@link windowCount}
41 * @see {@link windowTime}
42 * @see {@link windowToggle}
43 * @see {@link windowWhen}
46 * @param {Observable<any>} windowBoundaries An Observable that completes the
47 * previous window and starts a new window.
48 * @return {Observable<Observable<T>>} An Observable of windows, which are
49 * Observables emitting values of the source Observable.
53 export function window<T>(windowBoundaries: Observable<any>): OperatorFunction<T, Observable<T>> {
54 return function windowOperatorFunction(source: Observable<T>) {
55 return source.lift(new WindowOperator(windowBoundaries));
59 class WindowOperator<T> implements Operator<T, Observable<T>> {
61 constructor(private windowBoundaries: Observable<any>) {
64 call(subscriber: Subscriber<Observable<T>>, source: any): any {
65 const windowSubscriber = new WindowSubscriber(subscriber);
66 const sourceSubscription = source.subscribe(windowSubscriber);
67 if (!sourceSubscription.closed) {
68 windowSubscriber.add(subscribeToResult(windowSubscriber, this.windowBoundaries));
70 return sourceSubscription;
75 * We need this JSDoc comment for affecting ESDoc.
79 class WindowSubscriber<T> extends OuterSubscriber<T, any> {
81 private window: Subject<T> = new Subject<T>();
83 constructor(destination: Subscriber<Observable<T>>) {
85 destination.next(this.window);
88 notifyNext(outerValue: T, innerValue: any,
89 outerIndex: number, innerIndex: number,
90 innerSub: InnerSubscriber<T, any>): void {
94 notifyError(error: any, innerSub: InnerSubscriber<T, any>): void {
98 notifyComplete(innerSub: InnerSubscriber<T, any>): void {
102 protected _next(value: T): void {
103 this.window.next(value);
106 protected _error(err: any): void {
107 this.window.error(err);
108 this.destination.error(err);
111 protected _complete(): void {
112 this.window.complete();
113 this.destination.complete();
116 /** @deprecated This is an internal implementation detail, do not use. */
121 private openWindow(): void {
122 const prevWindow = this.window;
124 prevWindow.complete();
126 const destination = this.destination;
127 const newWindow = this.window = new Subject<T>();
128 destination.next(newWindow);