2 // https://github.com/tc39/proposal-observable
3 var $ = require('../internals/export');
4 var global = require('../internals/global');
5 var call = require('../internals/function-call');
6 var DESCRIPTORS = require('../internals/descriptors');
7 var setSpecies = require('../internals/set-species');
8 var aCallable = require('../internals/a-callable');
9 var isCallable = require('../internals/is-callable');
10 var isConstructor = require('../internals/is-constructor');
11 var anObject = require('../internals/an-object');
12 var isObject = require('../internals/is-object');
13 var anInstance = require('../internals/an-instance');
14 var defineProperty = require('../internals/object-define-property').f;
15 var redefine = require('../internals/redefine');
16 var redefineAll = require('../internals/redefine-all');
17 var getIterator = require('../internals/get-iterator');
18 var getMethod = require('../internals/get-method');
19 var iterate = require('../internals/iterate');
20 var hostReportErrors = require('../internals/host-report-errors');
21 var wellKnownSymbol = require('../internals/well-known-symbol');
22 var InternalStateModule = require('../internals/internal-state');
24 var OBSERVABLE = wellKnownSymbol('observable');
25 var getInternalState = InternalStateModule.get;
26 var setInternalState = InternalStateModule.set;
27 var Array = global.Array;
29 var cleanupSubscription = function (subscriptionState) {
30 var cleanup = subscriptionState.cleanup;
32 subscriptionState.cleanup = undefined;
36 hostReportErrors(error);
41 var subscriptionClosed = function (subscriptionState) {
42 return subscriptionState.observer === undefined;
45 var close = function (subscriptionState) {
46 var subscription = subscriptionState.facade;
48 subscription.closed = true;
49 var subscriptionObserver = subscriptionState.subscriptionObserver;
50 if (subscriptionObserver) subscriptionObserver.closed = true;
51 } subscriptionState.observer = undefined;
54 var Subscription = function (observer, subscriber) {
55 var subscriptionState = setInternalState(this, {
57 observer: anObject(observer),
58 subscriptionObserver: undefined
61 if (!DESCRIPTORS) this.closed = false;
63 if (start = getMethod(observer, 'start')) call(start, observer, this);
65 hostReportErrors(error);
67 if (subscriptionClosed(subscriptionState)) return;
68 var subscriptionObserver = subscriptionState.subscriptionObserver = new SubscriptionObserver(this);
70 var cleanup = subscriber(subscriptionObserver);
71 var subscription = cleanup;
72 if (cleanup != null) subscriptionState.cleanup = isCallable(cleanup.unsubscribe)
73 ? function () { subscription.unsubscribe(); }
76 subscriptionObserver.error(error);
78 } if (subscriptionClosed(subscriptionState)) cleanupSubscription(subscriptionState);
81 Subscription.prototype = redefineAll({}, {
82 unsubscribe: function unsubscribe() {
83 var subscriptionState = getInternalState(this);
84 if (!subscriptionClosed(subscriptionState)) {
85 close(subscriptionState);
86 cleanupSubscription(subscriptionState);
91 if (DESCRIPTORS) defineProperty(Subscription.prototype, 'closed', {
94 return subscriptionClosed(getInternalState(this));
98 var SubscriptionObserver = function (subscription) {
99 setInternalState(this, { subscription: subscription });
100 if (!DESCRIPTORS) this.closed = false;
103 SubscriptionObserver.prototype = redefineAll({}, {
104 next: function next(value) {
105 var subscriptionState = getInternalState(getInternalState(this).subscription);
106 if (!subscriptionClosed(subscriptionState)) {
107 var observer = subscriptionState.observer;
109 var nextMethod = getMethod(observer, 'next');
110 if (nextMethod) call(nextMethod, observer, value);
112 hostReportErrors(error);
116 error: function error(value) {
117 var subscriptionState = getInternalState(getInternalState(this).subscription);
118 if (!subscriptionClosed(subscriptionState)) {
119 var observer = subscriptionState.observer;
120 close(subscriptionState);
122 var errorMethod = getMethod(observer, 'error');
123 if (errorMethod) call(errorMethod, observer, value);
124 else hostReportErrors(value);
126 hostReportErrors(err);
127 } cleanupSubscription(subscriptionState);
130 complete: function complete() {
131 var subscriptionState = getInternalState(getInternalState(this).subscription);
132 if (!subscriptionClosed(subscriptionState)) {
133 var observer = subscriptionState.observer;
134 close(subscriptionState);
136 var completeMethod = getMethod(observer, 'complete');
137 if (completeMethod) call(completeMethod, observer);
139 hostReportErrors(error);
140 } cleanupSubscription(subscriptionState);
145 if (DESCRIPTORS) defineProperty(SubscriptionObserver.prototype, 'closed', {
148 return subscriptionClosed(getInternalState(getInternalState(this).subscription));
152 var $Observable = function Observable(subscriber) {
153 anInstance(this, ObservablePrototype);
154 setInternalState(this, { subscriber: aCallable(subscriber) });
157 var ObservablePrototype = $Observable.prototype;
159 redefineAll(ObservablePrototype, {
160 subscribe: function subscribe(observer) {
161 var length = arguments.length;
162 return new Subscription(isCallable(observer) ? {
164 error: length > 1 ? arguments[1] : undefined,
165 complete: length > 2 ? arguments[2] : undefined
166 } : isObject(observer) ? observer : {}, getInternalState(this).subscriber);
170 redefineAll($Observable, {
171 from: function from(x) {
172 var C = isConstructor(this) ? this : $Observable;
173 var observableMethod = getMethod(anObject(x), OBSERVABLE);
174 if (observableMethod) {
175 var observable = anObject(call(observableMethod, x));
176 return observable.constructor === C ? observable : new C(function (observer) {
177 return observable.subscribe(observer);
180 var iterator = getIterator(x);
181 return new C(function (observer) {
182 iterate(iterator, function (it, stop) {
184 if (observer.closed) return stop();
185 }, { IS_ITERATOR: true, INTERRUPTED: true });
190 var C = isConstructor(this) ? this : $Observable;
191 var length = arguments.length;
192 var items = Array(length);
194 while (index < length) items[index] = arguments[index++];
195 return new C(function (observer) {
196 for (var i = 0; i < length; i++) {
197 observer.next(items[i]);
198 if (observer.closed) return;
199 } observer.complete();
204 redefine(ObservablePrototype, OBSERVABLE, function () { return this; });
206 $({ global: true }, {
207 Observable: $Observable
210 setSpecies('Observable');