3 var _Object$setPrototypeO;
5 function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; }
7 var finished = require('./end-of-stream');
9 var kLastResolve = Symbol('lastResolve');
10 var kLastReject = Symbol('lastReject');
11 var kError = Symbol('error');
12 var kEnded = Symbol('ended');
13 var kLastPromise = Symbol('lastPromise');
14 var kHandlePromise = Symbol('handlePromise');
15 var kStream = Symbol('stream');
17 function createIterResult(value, done) {
24 function readAndResolve(iter) {
25 var resolve = iter[kLastResolve];
27 if (resolve !== null) {
28 var data = iter[kStream].read(); // we defer if data is null
29 // we can be expecting either 'end' or
33 iter[kLastPromise] = null;
34 iter[kLastResolve] = null;
35 iter[kLastReject] = null;
36 resolve(createIterResult(data, false));
41 function onReadable(iter) {
42 // we wait for the next tick, because it might
43 // emit an error with process.nextTick
44 process.nextTick(readAndResolve, iter);
47 function wrapForNext(lastPromise, iter) {
48 return function (resolve, reject) {
49 lastPromise.then(function () {
51 resolve(createIterResult(undefined, true));
55 iter[kHandlePromise](resolve, reject);
60 var AsyncIteratorPrototype = Object.getPrototypeOf(function () {});
61 var ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf((_Object$setPrototypeO = {
66 next: function next() {
69 // if we have detected an error in the meanwhile
70 // reject straight away
71 var error = this[kError];
74 return Promise.reject(error);
78 return Promise.resolve(createIterResult(undefined, true));
81 if (this[kStream].destroyed) {
82 // We need to defer via nextTick because if .destroy(err) is
83 // called, the error will be emitted via nextTick, and
84 // we cannot guarantee that there is no error lingering around
85 // waiting to be emitted.
86 return new Promise(function (resolve, reject) {
87 process.nextTick(function () {
89 reject(_this[kError]);
91 resolve(createIterResult(undefined, true));
95 } // if we have multiple next() calls
96 // we will wait for the previous Promise to finish
97 // this logic is optimized to support for await loops,
98 // where next() is only called once at a time
101 var lastPromise = this[kLastPromise];
105 promise = new Promise(wrapForNext(lastPromise, this));
107 // fast path needed to support multiple this.push()
108 // without triggering the next() queue
109 var data = this[kStream].read();
112 return Promise.resolve(createIterResult(data, false));
115 promise = new Promise(this[kHandlePromise]);
118 this[kLastPromise] = promise;
121 }, _defineProperty(_Object$setPrototypeO, Symbol.asyncIterator, function () {
123 }), _defineProperty(_Object$setPrototypeO, "return", function _return() {
126 // destroy(err, cb) is a private API
127 // we can guarantee we have that here, because we control the
128 // Readable class this is attached to
129 return new Promise(function (resolve, reject) {
130 _this2[kStream].destroy(null, function (err) {
136 resolve(createIterResult(undefined, true));
139 }), _Object$setPrototypeO), AsyncIteratorPrototype);
141 var createReadableStreamAsyncIterator = function createReadableStreamAsyncIterator(stream) {
144 var iterator = Object.create(ReadableStreamAsyncIteratorPrototype, (_Object$create = {}, _defineProperty(_Object$create, kStream, {
147 }), _defineProperty(_Object$create, kLastResolve, {
150 }), _defineProperty(_Object$create, kLastReject, {
153 }), _defineProperty(_Object$create, kError, {
156 }), _defineProperty(_Object$create, kEnded, {
157 value: stream._readableState.endEmitted,
159 }), _defineProperty(_Object$create, kHandlePromise, {
160 value: function value(resolve, reject) {
161 var data = iterator[kStream].read();
164 iterator[kLastPromise] = null;
165 iterator[kLastResolve] = null;
166 iterator[kLastReject] = null;
167 resolve(createIterResult(data, false));
169 iterator[kLastResolve] = resolve;
170 iterator[kLastReject] = reject;
174 }), _Object$create));
175 iterator[kLastPromise] = null;
176 finished(stream, function (err) {
177 if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
178 var reject = iterator[kLastReject]; // reject if we are waiting for data in the Promise
179 // returned by next() and store the error
181 if (reject !== null) {
182 iterator[kLastPromise] = null;
183 iterator[kLastResolve] = null;
184 iterator[kLastReject] = null;
188 iterator[kError] = err;
192 var resolve = iterator[kLastResolve];
194 if (resolve !== null) {
195 iterator[kLastPromise] = null;
196 iterator[kLastResolve] = null;
197 iterator[kLastReject] = null;
198 resolve(createIterResult(undefined, true));
201 iterator[kEnded] = true;
203 stream.on('readable', onReadable.bind(null, iterator));
207 module.exports = createReadableStreamAsyncIterator;