1 import { isFunction } from './util/isFunction';
2 import { empty as emptyObserver } from './Observer';
3 import { Observer, PartialObserver, TeardownLogic } from './types';
4 import { Subscription } from './Subscription';
5 import { rxSubscriber as rxSubscriberSymbol } from '../internal/symbol/rxSubscriber';
6 import { config } from './config';
7 import { hostReportError } from './util/hostReportError';
10 * Implements the {@link Observer} interface and extends the
11 * {@link Subscription} class. While the {@link Observer} is the public API for
12 * consuming the values of an {@link Observable}, all Observers get converted to
13 * a Subscriber, in order to provide Subscription-like capabilities such as
14 * `unsubscribe`. Subscriber is a common type in RxJS, and crucial for
15 * implementing operators, but it is rarely used as a public API.
17 * @class Subscriber<T>
19 export class Subscriber<T> extends Subscription implements Observer<T> {
21 [rxSubscriberSymbol]() { return this; }
24 * A static factory for a Subscriber, given a (potentially partial) definition
26 * @param {function(x: ?T): void} [next] The `next` callback of an Observer.
27 * @param {function(e: ?any): void} [error] The `error` callback of an
29 * @param {function(): void} [complete] The `complete` callback of an
31 * @return {Subscriber<T>} A Subscriber wrapping the (partially defined)
32 * Observer represented by the given arguments.
35 static create<T>(next?: (x?: T) => void,
36 error?: (e?: any) => void,
37 complete?: () => void): Subscriber<T> {
38 const subscriber = new Subscriber(next, error, complete);
39 subscriber.syncErrorThrowable = false;
43 /** @internal */ syncErrorValue: any = null;
44 /** @internal */ syncErrorThrown: boolean = false;
45 /** @internal */ syncErrorThrowable: boolean = false;
47 protected isStopped: boolean = false;
48 protected destination: PartialObserver<any> | Subscriber<any>; // this `any` is the escape hatch to erase extra type param (e.g. R)
51 * @param {Observer|function(value: T): void} [destinationOrNext] A partially
52 * defined Observer or a `next` callback function.
53 * @param {function(e: ?any): void} [error] The `error` callback of an
55 * @param {function(): void} [complete] The `complete` callback of an
58 constructor(destinationOrNext?: PartialObserver<any> | ((value: T) => void),
59 error?: (e?: any) => void,
60 complete?: () => void) {
63 switch (arguments.length) {
65 this.destination = emptyObserver;
68 if (!destinationOrNext) {
69 this.destination = emptyObserver;
72 if (typeof destinationOrNext === 'object') {
73 if (destinationOrNext instanceof Subscriber) {
74 this.syncErrorThrowable = destinationOrNext.syncErrorThrowable;
75 this.destination = destinationOrNext;
76 destinationOrNext.add(this);
78 this.syncErrorThrowable = true;
79 this.destination = new SafeSubscriber<T>(this, <PartialObserver<any>> destinationOrNext);
84 this.syncErrorThrowable = true;
85 this.destination = new SafeSubscriber<T>(this, <((value: T) => void)> destinationOrNext, error, complete);
91 * The {@link Observer} callback to receive notifications of type `next` from
92 * the Observable, with a value. The Observable may call this method 0 or more
94 * @param {T} [value] The `next` value.
97 next(value?: T): void {
98 if (!this.isStopped) {
104 * The {@link Observer} callback to receive notifications of type `error` from
105 * the Observable, with an attached `Error`. Notifies the Observer that
106 * the Observable has experienced an error condition.
107 * @param {any} [err] The `error` exception.
110 error(err?: any): void {
111 if (!this.isStopped) {
112 this.isStopped = true;
118 * The {@link Observer} callback to receive a valueless notification of type
119 * `complete` from the Observable. Notifies the Observer that the Observable
120 * has finished sending push-based notifications.
124 if (!this.isStopped) {
125 this.isStopped = true;
130 unsubscribe(): void {
134 this.isStopped = true;
138 protected _next(value: T): void {
139 this.destination.next(value);
142 protected _error(err: any): void {
143 this.destination.error(err);
147 protected _complete(): void {
148 this.destination.complete();
152 /** @deprecated This is an internal implementation detail, do not use. */
153 _unsubscribeAndRecycle(): Subscriber<T> {
154 const { _parentOrParents } = this;
155 this._parentOrParents = null;
158 this.isStopped = false;
159 this._parentOrParents = _parentOrParents;
165 * We need this JSDoc comment for affecting ESDoc.
169 export class SafeSubscriber<T> extends Subscriber<T> {
171 private _context: any;
173 constructor(private _parentSubscriber: Subscriber<T>,
174 observerOrNext?: PartialObserver<T> | ((value: T) => void),
175 error?: (e?: any) => void,
176 complete?: () => void) {
179 let next: ((value: T) => void);
180 let context: any = this;
182 if (isFunction(observerOrNext)) {
183 next = (<((value: T) => void)> observerOrNext);
184 } else if (observerOrNext) {
185 next = (<PartialObserver<T>> observerOrNext).next;
186 error = (<PartialObserver<T>> observerOrNext).error;
187 complete = (<PartialObserver<T>> observerOrNext).complete;
188 if (observerOrNext !== emptyObserver) {
189 context = Object.create(observerOrNext);
190 if (isFunction(context.unsubscribe)) {
191 this.add(<() => void> context.unsubscribe.bind(context));
193 context.unsubscribe = this.unsubscribe.bind(this);
197 this._context = context;
200 this._complete = complete;
203 next(value?: T): void {
204 if (!this.isStopped && this._next) {
205 const { _parentSubscriber } = this;
206 if (!config.useDeprecatedSynchronousErrorHandling || !_parentSubscriber.syncErrorThrowable) {
207 this.__tryOrUnsub(this._next, value);
208 } else if (this.__tryOrSetError(_parentSubscriber, this._next, value)) {
214 error(err?: any): void {
215 if (!this.isStopped) {
216 const { _parentSubscriber } = this;
217 const { useDeprecatedSynchronousErrorHandling } = config;
219 if (!useDeprecatedSynchronousErrorHandling || !_parentSubscriber.syncErrorThrowable) {
220 this.__tryOrUnsub(this._error, err);
223 this.__tryOrSetError(_parentSubscriber, this._error, err);
226 } else if (!_parentSubscriber.syncErrorThrowable) {
228 if (useDeprecatedSynchronousErrorHandling) {
231 hostReportError(err);
233 if (useDeprecatedSynchronousErrorHandling) {
234 _parentSubscriber.syncErrorValue = err;
235 _parentSubscriber.syncErrorThrown = true;
237 hostReportError(err);
245 if (!this.isStopped) {
246 const { _parentSubscriber } = this;
247 if (this._complete) {
248 const wrappedComplete = () => this._complete.call(this._context);
250 if (!config.useDeprecatedSynchronousErrorHandling || !_parentSubscriber.syncErrorThrowable) {
251 this.__tryOrUnsub(wrappedComplete);
254 this.__tryOrSetError(_parentSubscriber, wrappedComplete);
263 private __tryOrUnsub(fn: Function, value?: any): void {
265 fn.call(this._context, value);
268 if (config.useDeprecatedSynchronousErrorHandling) {
271 hostReportError(err);
276 private __tryOrSetError(parent: Subscriber<T>, fn: Function, value?: any): boolean {
277 if (!config.useDeprecatedSynchronousErrorHandling) {
278 throw new Error('bad call');
281 fn.call(this._context, value);
283 if (config.useDeprecatedSynchronousErrorHandling) {
284 parent.syncErrorValue = err;
285 parent.syncErrorThrown = true;
288 hostReportError(err);
295 /** @internal This is an internal implementation detail, do not use. */
296 _unsubscribe(): void {
297 const { _parentSubscriber } = this;
298 this._context = null;
299 this._parentSubscriber = null;
300 _parentSubscriber.unsubscribe();