1 import { Operator } from '../Operator';
2 import { Subscriber } from '../Subscriber';
3 import { Observable } from '../Observable';
4 import { Subscription } from '../Subscription';
5 import { subscribeToResult } from '../util/subscribeToResult';
6 import { OuterSubscriber } from '../OuterSubscriber';
7 import { InnerSubscriber } from '../InnerSubscriber';
8 import { OperatorFunction, SubscribableOrPromise } from '../types';
11 * Buffers the source Observable values starting from an emission from
12 * `openings` and ending when the output of `closingSelector` emits.
14 * <span class="informal">Collects values from the past as an array. Starts
15 * collecting only when `opening` emits, and calls the `closingSelector`
16 * function to get an Observable that tells when to close the buffer.</span>
18 * ![](bufferToggle.png)
20 * Buffers values from the source by opening the buffer via signals from an
21 * Observable provided to `openings`, and closing and sending the buffers when
22 * a Subscribable or Promise returned by the `closingSelector` function emits.
26 * Every other second, emit the click events from the next 500ms
29 * import { fromEvent, interval, EMPTY } from 'rxjs';
30 * import { bufferToggle } from 'rxjs/operators';
32 * const clicks = fromEvent(document, 'click');
33 * const openings = interval(1000);
34 * const buffered = clicks.pipe(bufferToggle(openings, i =>
35 * i % 2 ? interval(500) : EMPTY
37 * buffered.subscribe(x => console.log(x));
41 * @see {@link bufferCount}
42 * @see {@link bufferTime}
43 * @see {@link bufferWhen}
44 * @see {@link windowToggle}
46 * @param {SubscribableOrPromise<O>} openings A Subscribable or Promise of notifications to start new
48 * @param {function(value: O): SubscribableOrPromise} closingSelector A function that takes
49 * the value emitted by the `openings` observable and returns a Subscribable or Promise,
50 * which, when it emits, signals that the associated buffer should be emitted
52 * @return {Observable<T[]>} An observable of arrays of buffered values.
53 * @method bufferToggle
56 export function bufferToggle<T, O>(
57 openings: SubscribableOrPromise<O>,
58 closingSelector: (value: O) => SubscribableOrPromise<any>
59 ): OperatorFunction<T, T[]> {
60 return function bufferToggleOperatorFunction(source: Observable<T>) {
61 return source.lift(new BufferToggleOperator<T, O>(openings, closingSelector));
65 class BufferToggleOperator<T, O> implements Operator<T, T[]> {
67 constructor(private openings: SubscribableOrPromise<O>,
68 private closingSelector: (value: O) => SubscribableOrPromise<any>) {
71 call(subscriber: Subscriber<T[]>, source: any): any {
72 return source.subscribe(new BufferToggleSubscriber(subscriber, this.openings, this.closingSelector));
76 interface BufferContext<T> {
78 subscription: Subscription;
82 * We need this JSDoc comment for affecting ESDoc.
86 class BufferToggleSubscriber<T, O> extends OuterSubscriber<T, O> {
87 private contexts: Array<BufferContext<T>> = [];
89 constructor(destination: Subscriber<T[]>,
90 private openings: SubscribableOrPromise<O>,
91 private closingSelector: (value: O) => SubscribableOrPromise<any> | void) {
93 this.add(subscribeToResult(this, openings));
96 protected _next(value: T): void {
97 const contexts = this.contexts;
98 const len = contexts.length;
99 for (let i = 0; i < len; i++) {
100 contexts[i].buffer.push(value);
104 protected _error(err: any): void {
105 const contexts = this.contexts;
106 while (contexts.length > 0) {
107 const context = contexts.shift();
108 context.subscription.unsubscribe();
109 context.buffer = null;
110 context.subscription = null;
112 this.contexts = null;
116 protected _complete(): void {
117 const contexts = this.contexts;
118 while (contexts.length > 0) {
119 const context = contexts.shift();
120 this.destination.next(context.buffer);
121 context.subscription.unsubscribe();
122 context.buffer = null;
123 context.subscription = null;
125 this.contexts = null;
129 notifyNext(outerValue: any, innerValue: O,
130 outerIndex: number, innerIndex: number,
131 innerSub: InnerSubscriber<T, O>): void {
132 outerValue ? this.closeBuffer(outerValue) : this.openBuffer(innerValue);
135 notifyComplete(innerSub: InnerSubscriber<T, O>): void {
136 this.closeBuffer((<any> innerSub).context);
139 private openBuffer(value: O): void {
141 const closingSelector = this.closingSelector;
142 const closingNotifier = closingSelector.call(this, value);
143 if (closingNotifier) {
144 this.trySubscribe(closingNotifier);
151 private closeBuffer(context: BufferContext<T>): void {
152 const contexts = this.contexts;
154 if (contexts && context) {
155 const { buffer, subscription } = context;
156 this.destination.next(buffer);
157 contexts.splice(contexts.indexOf(context), 1);
158 this.remove(subscription);
159 subscription.unsubscribe();
163 private trySubscribe(closingNotifier: any): void {
164 const contexts = this.contexts;
166 const buffer: Array<T> = [];
167 const subscription = new Subscription();
168 const context = { buffer, subscription };
169 contexts.push(context);
171 const innerSubscription = subscribeToResult(this, closingNotifier, <any>context);
173 if (!innerSubscription || innerSubscription.closed) {
174 this.closeBuffer(context);
176 (<any> innerSubscription).context = context;
178 this.add(innerSubscription);
179 subscription.add(innerSubscription);