1 import { Operator } from '../Operator';
2 import { Subscriber } from '../Subscriber';
3 import { Observable } from '../Observable';
4 import { Subject } from '../Subject';
5 import { Subscription } from '../Subscription';
6 import { OuterSubscriber } from '../OuterSubscriber';
7 import { InnerSubscriber } from '../InnerSubscriber';
8 import { subscribeToResult } from '../util/subscribeToResult';
9 import { OperatorFunction } from '../types';
12 * Branch out the source Observable values as a nested Observable starting from
13 * an emission from `openings` and ending when the output of `closingSelector`
16 * <span class="informal">It's like {@link bufferToggle}, but emits a nested
17 * Observable instead of an array.</span>
19 * ![](windowToggle.png)
21 * Returns an Observable that emits windows of items it collects from the source
22 * Observable. The output Observable emits windows that contain those items
23 * emitted by the source Observable between the time when the `openings`
24 * Observable emits an item and when the Observable returned by
25 * `closingSelector` emits an item.
28 * Every other second, emit the click events from the next 500ms
30 * import { fromEvent, interval, EMPTY } from 'rxjs';
31 * import { windowToggle, mergeAll } from 'rxjs/operators';
33 * const clicks = fromEvent(document, 'click');
34 * const openings = interval(1000);
35 * const result = clicks.pipe(
36 * windowToggle(openings, i => i % 2 ? interval(500) : EMPTY),
39 * result.subscribe(x => console.log(x));
43 * @see {@link windowCount}
44 * @see {@link windowTime}
45 * @see {@link windowWhen}
46 * @see {@link bufferToggle}
48 * @param {Observable<O>} openings An observable of notifications to start new
50 * @param {function(value: O): Observable} closingSelector A function that takes
51 * the value emitted by the `openings` observable and returns an Observable,
52 * which, when it emits (either `next` or `complete`), signals that the
53 * associated window should complete.
54 * @return {Observable<Observable<T>>} An observable of windows, which in turn
56 * @method windowToggle
59 export function windowToggle<T, O>(openings: Observable<O>,
60 closingSelector: (openValue: O) => Observable<any>): OperatorFunction<T, Observable<T>> {
61 return (source: Observable<T>) => source.lift(new WindowToggleOperator<T, O>(openings, closingSelector));
64 class WindowToggleOperator<T, O> implements Operator<T, Observable<T>> {
66 constructor(private openings: Observable<O>,
67 private closingSelector: (openValue: O) => Observable<any>) {
70 call(subscriber: Subscriber<Observable<T>>, source: any): any {
71 return source.subscribe(new WindowToggleSubscriber(
72 subscriber, this.openings, this.closingSelector
77 interface WindowContext<T> {
79 subscription: Subscription;
83 * We need this JSDoc comment for affecting ESDoc.
87 class WindowToggleSubscriber<T, O> extends OuterSubscriber<T, any> {
88 private contexts: WindowContext<T>[] = [];
89 private openSubscription: Subscription;
91 constructor(destination: Subscriber<Observable<T>>,
92 private openings: Observable<O>,
93 private closingSelector: (openValue: O) => Observable<any>) {
95 this.add(this.openSubscription = subscribeToResult(this, openings, openings as any));
98 protected _next(value: T) {
99 const { contexts } = this;
101 const len = contexts.length;
102 for (let i = 0; i < len; i++) {
103 contexts[i].window.next(value);
108 protected _error(err: any) {
110 const { contexts } = this;
111 this.contexts = null;
114 const len = contexts.length;
117 while (++index < len) {
118 const context = contexts[index];
119 context.window.error(err);
120 context.subscription.unsubscribe();
127 protected _complete() {
128 const { contexts } = this;
129 this.contexts = null;
131 const len = contexts.length;
133 while (++index < len) {
134 const context = contexts[index];
135 context.window.complete();
136 context.subscription.unsubscribe();
142 /** @deprecated This is an internal implementation detail, do not use. */
144 const { contexts } = this;
145 this.contexts = null;
147 const len = contexts.length;
149 while (++index < len) {
150 const context = contexts[index];
151 context.window.unsubscribe();
152 context.subscription.unsubscribe();
157 notifyNext(outerValue: any, innerValue: any,
158 outerIndex: number, innerIndex: number,
159 innerSub: InnerSubscriber<T, any>): void {
161 if (outerValue === this.openings) {
164 const { closingSelector } = this;
165 closingNotifier = closingSelector(innerValue);
167 return this.error(e);
170 const window = new Subject<T>();
171 const subscription = new Subscription();
172 const context = { window, subscription };
173 this.contexts.push(context);
174 const innerSubscription = subscribeToResult(this, closingNotifier, context as any);
176 if (innerSubscription.closed) {
177 this.closeWindow(this.contexts.length - 1);
179 (<any>innerSubscription).context = context;
180 subscription.add(innerSubscription);
183 this.destination.next(window);
185 this.closeWindow(this.contexts.indexOf(outerValue));
189 notifyError(err: any): void {
193 notifyComplete(inner: Subscription): void {
194 if (inner !== this.openSubscription) {
195 this.closeWindow(this.contexts.indexOf((<any> inner).context));
199 private closeWindow(index: number): void {
204 const { contexts } = this;
205 const context = contexts[index];
206 const { window, subscription } = context;
207 contexts.splice(index, 1);
209 subscription.unsubscribe();