1 import { EmptyError } from '../util/EmptyError';
2 import { Observable } from '../Observable';
3 import { Operator } from '../Operator';
4 import { Subscriber } from '../Subscriber';
5 import { TeardownLogic, MonoTypeOperatorFunction } from '../types';
8 * If the source observable completes without emitting a value, it will emit
9 * an error. The error will be created at that time by the optional
10 * `errorFactory` argument, otherwise, the error will be {@link EmptyError}.
12 * ![](throwIfEmpty.png)
16 * import { fromEvent, timer } from 'rxjs';
17 * import { throwIfEmpty, takeUntil } from 'rxjs/operators';
19 * const click$ = fromEvent(document, 'click');
22 * takeUntil(timer(1000)),
24 * () => new Error('the document was not clicked within 1 second')
28 * next() { console.log('The button was clicked'); },
29 * error(err) { console.error(err); }
33 * @param errorFactory A factory function called to produce the
34 * error to be thrown when the source observable completes without emitting a
37 export function throwIfEmpty <T>(errorFactory: (() => any) = defaultErrorFactory): MonoTypeOperatorFunction<T> {
38 return (source: Observable<T>) => {
39 return source.lift(new ThrowIfEmptyOperator(errorFactory));
43 class ThrowIfEmptyOperator<T> implements Operator<T, T> {
44 constructor(private errorFactory: () => any) {
47 call(subscriber: Subscriber<T>, source: any): TeardownLogic {
48 return source.subscribe(new ThrowIfEmptySubscriber(subscriber, this.errorFactory));
52 class ThrowIfEmptySubscriber<T> extends Subscriber<T> {
53 private hasValue: boolean = false;
55 constructor(destination: Subscriber<T>, private errorFactory: () => any) {
59 protected _next(value: T): void {
61 this.destination.next(value);
64 protected _complete() {
68 err = this.errorFactory();
72 this.destination.error(err);
74 return this.destination.complete();
79 function defaultErrorFactory() {
80 return new EmptyError();