1 import { Operator } from '../Operator';
2 import { Subscriber } from '../Subscriber';
3 import { Observable } from '../Observable';
4 import { Subscription } from '../Subscription';
5 import { OuterSubscriber } from '../OuterSubscriber';
6 import { InnerSubscriber } from '../InnerSubscriber';
7 import { subscribeToResult } from '../util/subscribeToResult';
8 import { OperatorFunction } from '../types';
11 * Buffers the source Observable values, using a factory function of closing
12 * Observables to determine when to close, emit, and reset the buffer.
14 * <span class="informal">Collects values from the past as an array. When it
15 * starts collecting values, it calls a function that returns an Observable that
16 * tells when to close the buffer and restart collecting.</span>
20 * Opens a buffer immediately, then closes the buffer when the observable
21 * returned by calling `closingSelector` function emits a value. When it closes
22 * the buffer, it immediately opens a new buffer and repeats the process.
26 * Emit an array of the last clicks every [1-5] random seconds
29 * import { fromEvent, interval } from 'rxjs';
30 * import { bufferWhen } from 'rxjs/operators';
32 * const clicks = fromEvent(document, 'click');
33 * const buffered = clicks.pipe(bufferWhen(() =>
34 * interval(1000 + Math.random() * 4000)
36 * buffered.subscribe(x => console.log(x));
41 * @see {@link bufferCount}
42 * @see {@link bufferTime}
43 * @see {@link bufferToggle}
44 * @see {@link windowWhen}
46 * @param {function(): Observable} closingSelector A function that takes no
47 * arguments and returns an Observable that signals buffer closure.
48 * @return {Observable<T[]>} An observable of arrays of buffered values.
52 export function bufferWhen<T>(closingSelector: () => Observable<any>): OperatorFunction<T, T[]> {
53 return function (source: Observable<T>) {
54 return source.lift(new BufferWhenOperator(closingSelector));
58 class BufferWhenOperator<T> implements Operator<T, T[]> {
60 constructor(private closingSelector: () => Observable<any>) {
63 call(subscriber: Subscriber<T[]>, source: any): any {
64 return source.subscribe(new BufferWhenSubscriber(subscriber, this.closingSelector));
69 * We need this JSDoc comment for affecting ESDoc.
73 class BufferWhenSubscriber<T> extends OuterSubscriber<T, any> {
75 private subscribing: boolean = false;
76 private closingSubscription: Subscription;
78 constructor(destination: Subscriber<T[]>, private closingSelector: () => Observable<any>) {
83 protected _next(value: T) {
84 this.buffer.push(value);
87 protected _complete() {
88 const buffer = this.buffer;
90 this.destination.next(buffer);
95 /** @deprecated This is an internal implementation detail, do not use. */
98 this.subscribing = false;
101 notifyNext(outerValue: T, innerValue: any,
102 outerIndex: number, innerIndex: number,
103 innerSub: InnerSubscriber<T, any>): void {
107 notifyComplete(): void {
108 if (this.subscribing) {
116 let { closingSubscription } = this;
118 if (closingSubscription) {
119 this.remove(closingSubscription);
120 closingSubscription.unsubscribe();
123 const buffer = this.buffer;
125 this.destination.next(buffer);
132 const { closingSelector } = this;
133 closingNotifier = closingSelector();
135 return this.error(err);
137 closingSubscription = new Subscription();
138 this.closingSubscription = closingSubscription;
139 this.add(closingSubscription);
140 this.subscribing = true;
141 closingSubscription.add(subscribeToResult(this, closingNotifier));
142 this.subscribing = false;