1 import { Subscriber } from '../Subscriber';
2 import { Subscription } from '../Subscription';
3 import { Observable } from '../Observable';
4 import { Subject } from '../Subject';
5 export function groupBy(keySelector, elementSelector, durationSelector, subjectSelector) {
6 return (source) => source.lift(new GroupByOperator(keySelector, elementSelector, durationSelector, subjectSelector));
8 class GroupByOperator {
9 constructor(keySelector, elementSelector, durationSelector, subjectSelector) {
10 this.keySelector = keySelector;
11 this.elementSelector = elementSelector;
12 this.durationSelector = durationSelector;
13 this.subjectSelector = subjectSelector;
15 call(subscriber, source) {
16 return source.subscribe(new GroupBySubscriber(subscriber, this.keySelector, this.elementSelector, this.durationSelector, this.subjectSelector));
19 class GroupBySubscriber extends Subscriber {
20 constructor(destination, keySelector, elementSelector, durationSelector, subjectSelector) {
22 this.keySelector = keySelector;
23 this.elementSelector = elementSelector;
24 this.durationSelector = durationSelector;
25 this.subjectSelector = subjectSelector;
27 this.attemptedToUnsubscribe = false;
33 key = this.keySelector(value);
39 this._group(value, key);
42 let groups = this.groups;
44 groups = this.groups = new Map();
46 let group = groups.get(key);
48 if (this.elementSelector) {
50 element = this.elementSelector(value);
60 group = (this.subjectSelector ? this.subjectSelector() : new Subject());
61 groups.set(key, group);
62 const groupedObservable = new GroupedObservable(key, group, this);
63 this.destination.next(groupedObservable);
64 if (this.durationSelector) {
67 duration = this.durationSelector(new GroupedObservable(key, group));
73 this.add(duration.subscribe(new GroupDurationSubscriber(key, group, this)));
81 const groups = this.groups;
83 groups.forEach((group, key) => {
88 this.destination.error(err);
91 const groups = this.groups;
93 groups.forEach((group, key) => {
98 this.destination.complete();
101 this.groups.delete(key);
105 this.attemptedToUnsubscribe = true;
106 if (this.count === 0) {
112 class GroupDurationSubscriber extends Subscriber {
113 constructor(key, group, parent) {
117 this.parent = parent;
123 const { parent, key } = this;
124 this.key = this.parent = null;
126 parent.removeGroup(key);
130 export class GroupedObservable extends Observable {
131 constructor(key, groupSubject, refCountSubscription) {
134 this.groupSubject = groupSubject;
135 this.refCountSubscription = refCountSubscription;
137 _subscribe(subscriber) {
138 const subscription = new Subscription();
139 const { refCountSubscription, groupSubject } = this;
140 if (refCountSubscription && !refCountSubscription.closed) {
141 subscription.add(new InnerRefCountSubscription(refCountSubscription));
143 subscription.add(groupSubject.subscribe(subscriber));
147 class InnerRefCountSubscription extends Subscription {
148 constructor(parent) {
150 this.parent = parent;
154 const parent = this.parent;
155 if (!parent.closed && !this.closed) {
158 if (parent.count === 0 && parent.attemptedToUnsubscribe) {
159 parent.unsubscribe();
164 //# sourceMappingURL=groupBy.js.map