1 import { Operator } from '../Operator';
2 import { Subscriber } from '../Subscriber';
3 import { Observable } from '../Observable';
4 import { OperatorFunction, TeardownLogic } from '../types';
7 * Buffers the source Observable values until the size hits the maximum
10 * <span class="informal">Collects values from the past as an array, and emits
11 * that array only when its size reaches `bufferSize`.</span>
13 * ![](bufferCount.png)
15 * Buffers a number of values from the source Observable by `bufferSize` then
16 * emits the buffer and clears it, and starts a new buffer each
17 * `startBufferEvery` values. If `startBufferEvery` is not provided or is
18 * `null`, then new buffers are started immediately at the start of the source
19 * and when each buffer closes and is emitted.
23 * Emit the last two click events as an array
26 * import { fromEvent } from 'rxjs';
27 * import { bufferCount } from 'rxjs/operators';
29 * const clicks = fromEvent(document, 'click');
30 * const buffered = clicks.pipe(bufferCount(2));
31 * buffered.subscribe(x => console.log(x));
34 * On every click, emit the last two click events as an array
37 * import { fromEvent } from 'rxjs';
38 * import { bufferCount } from 'rxjs/operators';
40 * const clicks = fromEvent(document, 'click');
41 * const buffered = clicks.pipe(bufferCount(2, 1));
42 * buffered.subscribe(x => console.log(x));
46 * @see {@link bufferTime}
47 * @see {@link bufferToggle}
48 * @see {@link bufferWhen}
49 * @see {@link pairwise}
50 * @see {@link windowCount}
52 * @param {number} bufferSize The maximum size of the buffer emitted.
53 * @param {number} [startBufferEvery] Interval at which to start a new buffer.
54 * For example if `startBufferEvery` is `2`, then a new buffer will be started
55 * on every other value from the source. A new buffer is started at the
56 * beginning of the source by default.
57 * @return {Observable<T[]>} An Observable of arrays of buffered values.
61 export function bufferCount<T>(bufferSize: number, startBufferEvery: number = null): OperatorFunction<T, T[]> {
62 return function bufferCountOperatorFunction(source: Observable<T>) {
63 return source.lift(new BufferCountOperator<T>(bufferSize, startBufferEvery));
67 class BufferCountOperator<T> implements Operator<T, T[]> {
68 private subscriberClass: any;
70 constructor(private bufferSize: number, private startBufferEvery: number) {
71 if (!startBufferEvery || bufferSize === startBufferEvery) {
72 this.subscriberClass = BufferCountSubscriber;
74 this.subscriberClass = BufferSkipCountSubscriber;
78 call(subscriber: Subscriber<T[]>, source: any): TeardownLogic {
79 return source.subscribe(new this.subscriberClass(subscriber, this.bufferSize, this.startBufferEvery));
84 * We need this JSDoc comment for affecting ESDoc.
88 class BufferCountSubscriber<T> extends Subscriber<T> {
89 private buffer: T[] = [];
91 constructor(destination: Subscriber<T[]>, private bufferSize: number) {
95 protected _next(value: T): void {
96 const buffer = this.buffer;
100 if (buffer.length == this.bufferSize) {
101 this.destination.next(buffer);
106 protected _complete(): void {
107 const buffer = this.buffer;
108 if (buffer.length > 0) {
109 this.destination.next(buffer);
116 * We need this JSDoc comment for affecting ESDoc.
120 class BufferSkipCountSubscriber<T> extends Subscriber<T> {
121 private buffers: Array<T[]> = [];
122 private count: number = 0;
124 constructor(destination: Subscriber<T[]>, private bufferSize: number, private startBufferEvery: number) {
128 protected _next(value: T): void {
129 const { bufferSize, startBufferEvery, buffers, count } = this;
132 if (count % startBufferEvery === 0) {
136 for (let i = buffers.length; i--; ) {
137 const buffer = buffers[i];
139 if (buffer.length === bufferSize) {
140 buffers.splice(i, 1);
141 this.destination.next(buffer);
146 protected _complete(): void {
147 const { buffers, destination } = this;
149 while (buffers.length > 0) {
150 let buffer = buffers.shift();
151 if (buffer.length > 0) {
152 destination.next(buffer);