--- /dev/null
+"use strict";
+/* --------------------------------------------------------------------------------------------
+ * Copyright (c) Microsoft Corporation. All rights reserved.
+ * Licensed under the MIT License. See License.txt in the project root for license information.
+ * ------------------------------------------------------------------------------------------ */
+Object.defineProperty(exports, "__esModule", { value: true });
+exports.ReadableStreamMessageReader = exports.AbstractMessageReader = exports.MessageReader = void 0;
+const ral_1 = require("./ral");
+const Is = require("./is");
+const events_1 = require("./events");
+var MessageReader;
+(function (MessageReader) {
+ function is(value) {
+ let candidate = value;
+ return candidate && Is.func(candidate.listen) && Is.func(candidate.dispose) &&
+ Is.func(candidate.onError) && Is.func(candidate.onClose) && Is.func(candidate.onPartialMessage);
+ }
+ MessageReader.is = is;
+})(MessageReader = exports.MessageReader || (exports.MessageReader = {}));
+class AbstractMessageReader {
+ constructor() {
+ this.errorEmitter = new events_1.Emitter();
+ this.closeEmitter = new events_1.Emitter();
+ this.partialMessageEmitter = new events_1.Emitter();
+ }
+ dispose() {
+ this.errorEmitter.dispose();
+ this.closeEmitter.dispose();
+ }
+ get onError() {
+ return this.errorEmitter.event;
+ }
+ fireError(error) {
+ this.errorEmitter.fire(this.asError(error));
+ }
+ get onClose() {
+ return this.closeEmitter.event;
+ }
+ fireClose() {
+ this.closeEmitter.fire(undefined);
+ }
+ get onPartialMessage() {
+ return this.partialMessageEmitter.event;
+ }
+ firePartialMessage(info) {
+ this.partialMessageEmitter.fire(info);
+ }
+ asError(error) {
+ if (error instanceof Error) {
+ return error;
+ }
+ else {
+ return new Error(`Reader received error. Reason: ${Is.string(error.message) ? error.message : 'unknown'}`);
+ }
+ }
+}
+exports.AbstractMessageReader = AbstractMessageReader;
+var ResolvedMessageReaderOptions;
+(function (ResolvedMessageReaderOptions) {
+ function fromOptions(options) {
+ var _a;
+ let charset;
+ let result;
+ let contentDecoder;
+ const contentDecoders = new Map();
+ let contentTypeDecoder;
+ const contentTypeDecoders = new Map();
+ if (options === undefined || typeof options === 'string') {
+ charset = options !== null && options !== void 0 ? options : 'utf-8';
+ }
+ else {
+ charset = (_a = options.charset) !== null && _a !== void 0 ? _a : 'utf-8';
+ if (options.contentDecoder !== undefined) {
+ contentDecoder = options.contentDecoder;
+ contentDecoders.set(contentDecoder.name, contentDecoder);
+ }
+ if (options.contentDecoders !== undefined) {
+ for (const decoder of options.contentDecoders) {
+ contentDecoders.set(decoder.name, decoder);
+ }
+ }
+ if (options.contentTypeDecoder !== undefined) {
+ contentTypeDecoder = options.contentTypeDecoder;
+ contentTypeDecoders.set(contentTypeDecoder.name, contentTypeDecoder);
+ }
+ if (options.contentTypeDecoders !== undefined) {
+ for (const decoder of options.contentTypeDecoders) {
+ contentTypeDecoders.set(decoder.name, decoder);
+ }
+ }
+ }
+ if (contentTypeDecoder === undefined) {
+ contentTypeDecoder = ral_1.default().applicationJson.decoder;
+ contentTypeDecoders.set(contentTypeDecoder.name, contentTypeDecoder);
+ }
+ return { charset, contentDecoder, contentDecoders, contentTypeDecoder, contentTypeDecoders };
+ }
+ ResolvedMessageReaderOptions.fromOptions = fromOptions;
+})(ResolvedMessageReaderOptions || (ResolvedMessageReaderOptions = {}));
+class ReadableStreamMessageReader extends AbstractMessageReader {
+ constructor(readable, options) {
+ super();
+ this.readable = readable;
+ this.options = ResolvedMessageReaderOptions.fromOptions(options);
+ this.buffer = ral_1.default().messageBuffer.create(this.options.charset);
+ this._partialMessageTimeout = 10000;
+ this.nextMessageLength = -1;
+ this.messageToken = 0;
+ }
+ set partialMessageTimeout(timeout) {
+ this._partialMessageTimeout = timeout;
+ }
+ get partialMessageTimeout() {
+ return this._partialMessageTimeout;
+ }
+ listen(callback) {
+ this.nextMessageLength = -1;
+ this.messageToken = 0;
+ this.partialMessageTimer = undefined;
+ this.callback = callback;
+ const result = this.readable.onData((data) => {
+ this.onData(data);
+ });
+ this.readable.onError((error) => this.fireError(error));
+ this.readable.onClose(() => this.fireClose());
+ return result;
+ }
+ onData(data) {
+ this.buffer.append(data);
+ while (true) {
+ if (this.nextMessageLength === -1) {
+ const headers = this.buffer.tryReadHeaders();
+ if (!headers) {
+ return;
+ }
+ const contentLength = headers.get('Content-Length');
+ if (!contentLength) {
+ throw new Error('Header must provide a Content-Length property.');
+ }
+ const length = parseInt(contentLength);
+ if (isNaN(length)) {
+ throw new Error('Content-Length value must be a number.');
+ }
+ this.nextMessageLength = length;
+ }
+ const body = this.buffer.tryReadBody(this.nextMessageLength);
+ if (body === undefined) {
+ /** We haven't received the full message yet. */
+ this.setPartialMessageTimer();
+ return;
+ }
+ this.clearPartialMessageTimer();
+ this.nextMessageLength = -1;
+ let p;
+ if (this.options.contentDecoder !== undefined) {
+ p = this.options.contentDecoder.decode(body);
+ }
+ else {
+ p = Promise.resolve(body);
+ }
+ p.then((value) => {
+ this.options.contentTypeDecoder.decode(value, this.options).then((msg) => {
+ this.callback(msg);
+ }, (error) => {
+ this.fireError(error);
+ });
+ }, (error) => {
+ this.fireError(error);
+ });
+ }
+ }
+ clearPartialMessageTimer() {
+ if (this.partialMessageTimer) {
+ ral_1.default().timer.clearTimeout(this.partialMessageTimer);
+ this.partialMessageTimer = undefined;
+ }
+ }
+ setPartialMessageTimer() {
+ this.clearPartialMessageTimer();
+ if (this._partialMessageTimeout <= 0) {
+ return;
+ }
+ this.partialMessageTimer = ral_1.default().timer.setTimeout((token, timeout) => {
+ this.partialMessageTimer = undefined;
+ if (token === this.messageToken) {
+ this.firePartialMessage({ messageToken: token, waitingTime: timeout });
+ this.setPartialMessageTimer();
+ }
+ }, this._partialMessageTimeout, this.messageToken, this._partialMessageTimeout);
+ }
+}
+exports.ReadableStreamMessageReader = ReadableStreamMessageReader;
+//# sourceMappingURL=messageReader.js.map
\ No newline at end of file