+++ /dev/null
-import { Subscriber } from '../Subscriber';
-import { Subscription } from '../Subscription';
-import { Observable } from '../Observable';
-import { Subject } from '../Subject';
-export function groupBy(keySelector, elementSelector, durationSelector, subjectSelector) {
- return (source) => source.lift(new GroupByOperator(keySelector, elementSelector, durationSelector, subjectSelector));
-}
-class GroupByOperator {
- constructor(keySelector, elementSelector, durationSelector, subjectSelector) {
- this.keySelector = keySelector;
- this.elementSelector = elementSelector;
- this.durationSelector = durationSelector;
- this.subjectSelector = subjectSelector;
- }
- call(subscriber, source) {
- return source.subscribe(new GroupBySubscriber(subscriber, this.keySelector, this.elementSelector, this.durationSelector, this.subjectSelector));
- }
-}
-class GroupBySubscriber extends Subscriber {
- constructor(destination, keySelector, elementSelector, durationSelector, subjectSelector) {
- super(destination);
- this.keySelector = keySelector;
- this.elementSelector = elementSelector;
- this.durationSelector = durationSelector;
- this.subjectSelector = subjectSelector;
- this.groups = null;
- this.attemptedToUnsubscribe = false;
- this.count = 0;
- }
- _next(value) {
- let key;
- try {
- key = this.keySelector(value);
- }
- catch (err) {
- this.error(err);
- return;
- }
- this._group(value, key);
- }
- _group(value, key) {
- let groups = this.groups;
- if (!groups) {
- groups = this.groups = new Map();
- }
- let group = groups.get(key);
- let element;
- if (this.elementSelector) {
- try {
- element = this.elementSelector(value);
- }
- catch (err) {
- this.error(err);
- }
- }
- else {
- element = value;
- }
- if (!group) {
- group = (this.subjectSelector ? this.subjectSelector() : new Subject());
- groups.set(key, group);
- const groupedObservable = new GroupedObservable(key, group, this);
- this.destination.next(groupedObservable);
- if (this.durationSelector) {
- let duration;
- try {
- duration = this.durationSelector(new GroupedObservable(key, group));
- }
- catch (err) {
- this.error(err);
- return;
- }
- this.add(duration.subscribe(new GroupDurationSubscriber(key, group, this)));
- }
- }
- if (!group.closed) {
- group.next(element);
- }
- }
- _error(err) {
- const groups = this.groups;
- if (groups) {
- groups.forEach((group, key) => {
- group.error(err);
- });
- groups.clear();
- }
- this.destination.error(err);
- }
- _complete() {
- const groups = this.groups;
- if (groups) {
- groups.forEach((group, key) => {
- group.complete();
- });
- groups.clear();
- }
- this.destination.complete();
- }
- removeGroup(key) {
- this.groups.delete(key);
- }
- unsubscribe() {
- if (!this.closed) {
- this.attemptedToUnsubscribe = true;
- if (this.count === 0) {
- super.unsubscribe();
- }
- }
- }
-}
-class GroupDurationSubscriber extends Subscriber {
- constructor(key, group, parent) {
- super(group);
- this.key = key;
- this.group = group;
- this.parent = parent;
- }
- _next(value) {
- this.complete();
- }
- _unsubscribe() {
- const { parent, key } = this;
- this.key = this.parent = null;
- if (parent) {
- parent.removeGroup(key);
- }
- }
-}
-export class GroupedObservable extends Observable {
- constructor(key, groupSubject, refCountSubscription) {
- super();
- this.key = key;
- this.groupSubject = groupSubject;
- this.refCountSubscription = refCountSubscription;
- }
- _subscribe(subscriber) {
- const subscription = new Subscription();
- const { refCountSubscription, groupSubject } = this;
- if (refCountSubscription && !refCountSubscription.closed) {
- subscription.add(new InnerRefCountSubscription(refCountSubscription));
- }
- subscription.add(groupSubject.subscribe(subscriber));
- return subscription;
- }
-}
-class InnerRefCountSubscription extends Subscription {
- constructor(parent) {
- super();
- this.parent = parent;
- parent.count++;
- }
- unsubscribe() {
- const parent = this.parent;
- if (!parent.closed && !this.closed) {
- super.unsubscribe();
- parent.count -= 1;
- if (parent.count === 0 && parent.attemptedToUnsubscribe) {
- parent.unsubscribe();
- }
- }
- }
-}
-//# sourceMappingURL=groupBy.js.map
\ No newline at end of file