1 import { Operator } from '../Operator';
2 import { Subscriber } from '../Subscriber';
3 import { Observable } from '../Observable';
4 import { Subscription } from '../Subscription';
5 import { OperatorFunction } from '../types';
6 import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe';
9 * Buffers the source Observable values, using a factory function of closing
10 * Observables to determine when to close, emit, and reset the buffer.
12 * <span class="informal">Collects values from the past as an array. When it
13 * starts collecting values, it calls a function that returns an Observable that
14 * tells when to close the buffer and restart collecting.</span>
18 * Opens a buffer immediately, then closes the buffer when the observable
19 * returned by calling `closingSelector` function emits a value. When it closes
20 * the buffer, it immediately opens a new buffer and repeats the process.
24 * Emit an array of the last clicks every [1-5] random seconds
27 * import { fromEvent, interval } from 'rxjs';
28 * import { bufferWhen } from 'rxjs/operators';
30 * const clicks = fromEvent(document, 'click');
31 * const buffered = clicks.pipe(bufferWhen(() =>
32 * interval(1000 + Math.random() * 4000)
34 * buffered.subscribe(x => console.log(x));
39 * @see {@link bufferCount}
40 * @see {@link bufferTime}
41 * @see {@link bufferToggle}
42 * @see {@link windowWhen}
44 * @param {function(): Observable} closingSelector A function that takes no
45 * arguments and returns an Observable that signals buffer closure.
46 * @return {Observable<T[]>} An observable of arrays of buffered values.
50 export function bufferWhen<T>(closingSelector: () => Observable<any>): OperatorFunction<T, T[]> {
51 return function (source: Observable<T>) {
52 return source.lift(new BufferWhenOperator(closingSelector));
56 class BufferWhenOperator<T> implements Operator<T, T[]> {
58 constructor(private closingSelector: () => Observable<any>) {
61 call(subscriber: Subscriber<T[]>, source: any): any {
62 return source.subscribe(new BufferWhenSubscriber(subscriber, this.closingSelector));
67 * We need this JSDoc comment for affecting ESDoc.
71 class BufferWhenSubscriber<T> extends SimpleOuterSubscriber<T, any> {
73 private subscribing: boolean = false;
74 private closingSubscription?: Subscription;
76 constructor(destination: Subscriber<T[]>, private closingSelector: () => Observable<any>) {
81 protected _next(value: T) {
82 this.buffer!.push(value);
85 protected _complete() {
86 const buffer = this.buffer;
88 this.destination.next!(buffer);
93 /** @deprecated This is an internal implementation detail, do not use. */
95 this.buffer = undefined;
96 this.subscribing = false;
103 notifyComplete(): void {
104 if (this.subscribing) {
112 let { closingSubscription } = this;
114 if (closingSubscription) {
115 this.remove(closingSubscription);
116 closingSubscription.unsubscribe();
119 const buffer = this.buffer;
121 this.destination.next!(buffer);
128 const { closingSelector } = this;
129 closingNotifier = closingSelector();
131 return this.error(err);
133 closingSubscription = new Subscription();
134 this.closingSubscription = closingSubscription;
135 this.add(closingSubscription);
136 this.subscribing = true;
137 closingSubscription.add(innerSubscribe(closingNotifier, new SimpleInnerSubscriber(this)));
138 this.subscribing = false;