1 import { Operator } from './Operator';
2 import { Observable } from './Observable';
3 import { Subscriber } from './Subscriber';
4 import { Subscription } from './Subscription';
5 import { Observer, SubscriptionLike, TeardownLogic } from './types';
6 import { ObjectUnsubscribedError } from './util/ObjectUnsubscribedError';
7 import { SubjectSubscription } from './SubjectSubscription';
8 import { rxSubscriber as rxSubscriberSymbol } from '../internal/symbol/rxSubscriber';
11 * @class SubjectSubscriber<T>
13 export class SubjectSubscriber<T> extends Subscriber<T> {
14 constructor(protected destination: Subject<T>) {
20 * A Subject is a special type of Observable that allows values to be
21 * multicasted to many Observers. Subjects are like EventEmitters.
23 * Every Subject is an Observable and an Observer. You can subscribe to a
24 * Subject, and you can call next to feed values as well as error and complete.
28 export class Subject<T> extends Observable<T> implements SubscriptionLike {
30 [rxSubscriberSymbol]() {
31 return new SubjectSubscriber(this);
34 observers: Observer<T>[] = [];
42 thrownError: any = null;
49 * @deprecated use new Subject() instead
51 static create: Function = <T>(destination: Observer<T>, source: Observable<T>): AnonymousSubject<T> => {
52 return new AnonymousSubject<T>(destination, source);
55 lift<R>(operator: Operator<T, R>): Observable<R> {
56 const subject = new AnonymousSubject(this, this);
57 subject.operator = <any>operator;
63 throw new ObjectUnsubscribedError();
65 if (!this.isStopped) {
66 const { observers } = this;
67 const len = observers.length;
68 const copy = observers.slice();
69 for (let i = 0; i < len; i++) {
77 throw new ObjectUnsubscribedError();
80 this.thrownError = err;
81 this.isStopped = true;
82 const { observers } = this;
83 const len = observers.length;
84 const copy = observers.slice();
85 for (let i = 0; i < len; i++) {
88 this.observers.length = 0;
93 throw new ObjectUnsubscribedError();
95 this.isStopped = true;
96 const { observers } = this;
97 const len = observers.length;
98 const copy = observers.slice();
99 for (let i = 0; i < len; i++) {
102 this.observers.length = 0;
106 this.isStopped = true;
108 this.observers = null;
111 /** @deprecated This is an internal implementation detail, do not use. */
112 _trySubscribe(subscriber: Subscriber<T>): TeardownLogic {
114 throw new ObjectUnsubscribedError();
116 return super._trySubscribe(subscriber);
120 /** @deprecated This is an internal implementation detail, do not use. */
121 _subscribe(subscriber: Subscriber<T>): Subscription {
123 throw new ObjectUnsubscribedError();
124 } else if (this.hasError) {
125 subscriber.error(this.thrownError);
126 return Subscription.EMPTY;
127 } else if (this.isStopped) {
128 subscriber.complete();
129 return Subscription.EMPTY;
131 this.observers.push(subscriber);
132 return new SubjectSubscription(this, subscriber);
137 * Creates a new Observable with this Subject as the source. You can do this
138 * to create customize Observer-side logic of the Subject and conceal it from
139 * code that uses the Observable.
140 * @return {Observable} Observable that the Subject casts to
142 asObservable(): Observable<T> {
143 const observable = new Observable<T>();
144 (<any>observable).source = this;
150 * @class AnonymousSubject<T>
152 export class AnonymousSubject<T> extends Subject<T> {
153 constructor(protected destination?: Observer<T>, source?: Observable<T>) {
155 this.source = source;
159 const { destination } = this;
160 if (destination && destination.next) {
161 destination.next(value);
166 const { destination } = this;
167 if (destination && destination.error) {
168 this.destination.error(err);
173 const { destination } = this;
174 if (destination && destination.complete) {
175 this.destination.complete();
179 /** @deprecated This is an internal implementation detail, do not use. */
180 _subscribe(subscriber: Subscriber<T>): Subscription {
181 const { source } = this;
183 return this.source.subscribe(subscriber);
185 return Subscription.EMPTY;