chore(cli): bump version
[crowdnode.js/.git] / lib / ws.js
1 "use strict";
2
3 let Ws = module.exports;
4
5 let Cookies = require("../lib/cookies.js");
6 let request = require("./request.js");
7
8 let WSClient = require("ws");
9
10 /**
11  * @param {Object} opts
12  * @param {String} opts.baseUrl
13  * @param {CookieStore} opts.cookieStore
14  * @param {Boolean} opts.debug
15  * @param {Function} opts.onClose
16  * @param {Function} opts.onError
17  * @param {Function} opts.onMessage
18  */
19 Ws.create = function ({
20   baseUrl,
21   cookieStore,
22   debug,
23   onClose,
24   onError,
25   onMessage,
26 }) {
27   let wsc = {};
28
29   let defaultHeaders = {
30     /*
31     //'Accept-Encoding': gzip, deflate, br
32     "Accept-Language": "en-US,en;q=0.9",
33     "Cache-Control": "no-cache",
34     Origin: "https://insight.dash.org",
35     referer: "https://insight.dash.org/insight/",
36     "sec-fetch-dest": "empty",
37     "sec-fetch-mode": "cors",
38     "sec-fetch-site": "same-origin",
39     "sec-gpc": "1",
40     */
41   };
42
43   let Eio3 = {};
44   /*
45   let httpAgent = new Https.Agent({
46     keepAlive: true,
47     maxSockets: 2,
48   });
49   */
50
51   // Get `sid` (session id) and ping/pong params
52   Eio3.connect = async function () {
53     let now = Date.now();
54     let sidUrl = `${baseUrl}/socket.io/?EIO=3&transport=polling&t=${now}`;
55
56     let cookies = await cookieStore.get(sidUrl);
57     let sidResp = await request({
58       //agent: httpAgent,
59       url: sidUrl,
60       headers: Object.assign(
61         {
62           Cookie: cookies,
63         },
64         defaultHeaders,
65       ),
66       json: false,
67     });
68     if (!sidResp.ok) {
69       console.error(sidResp.toJSON());
70       throw new Error("bad response");
71     }
72     await cookieStore.set(sidUrl, sidResp);
73
74     // ex: `97:0{"sid":"xxxx",...}`
75     let msg = sidResp.body;
76     let colonIndex = msg.indexOf(":");
77     // 0 is CONNECT, which will always follow our first message
78     let start = colonIndex + ":0".length;
79     let len = parseInt(msg.slice(0, colonIndex), 10);
80     let json = msg.slice(start, start + (len - 1));
81
82     //console.log("Socket.io Connect:");
83     //console.log(msg);
84     //console.log(json);
85
86     // @type {SocketIoHello}
87     let session = JSON.parse(json);
88     return session;
89   };
90
91   /**
92    * @param {String} sid
93    * @param {String} eventname
94    */
95   Eio3.subscribe = async function (sid, eventname) {
96     let now = Date.now();
97     let subUrl = `${baseUrl}/socket.io/?EIO=3&transport=polling&t=${now}&sid=${sid}`;
98     let sub = JSON.stringify(["subscribe", eventname]);
99     // not really sure what this is, couldn't find documentation for it
100     let typ = 422; // 4 = MESSAGE, 2 = EVENT, 2 = ???
101     let msg = `${typ}${sub}`;
102     let len = msg.length;
103     let body = `${len}:${msg}`;
104
105     let cookies = await cookieStore.get(subUrl);
106     let subResp = await request({
107       //agent: httpAgent,
108       method: "POST",
109       url: subUrl,
110       headers: Object.assign(
111         {
112           "Content-Type": "text/plain;charset=UTF-8",
113           Cookie: cookies,
114         },
115         defaultHeaders,
116       ),
117       body: body,
118     });
119     if (!subResp.ok) {
120       console.error(subResp.toJSON());
121       throw new Error("bad response");
122     }
123     await cookieStore.set(subUrl, subResp);
124
125     return subResp.body;
126   };
127
128   /*
129   Eio3.poll = async function (sid) {
130     let now = Date.now();
131     let pollUrl = `${baseUrl}/socket.io/?EIO=3&transport=polling&t=${now}&sid=${sid}`;
132
133     let cookies = await cookieStore.get(pollUrl);
134     let pollResp = await request({
135       //agent: httpAgent,
136       method: "GET",
137       url: pollUrl,
138       headers: Object.assign(
139         {
140           Cookie: cookies,
141         },
142         defaultHeaders,
143       ),
144     });
145     if (!pollResp.ok) {
146       console.error(pollResp.toJSON());
147       throw new Error("bad response");
148     }
149     await cookieStore.set(pollUrl, pollResp);
150
151     return pollResp.body;
152   };
153   */
154
155   /**
156    * @param {String} sid - session id (associated with AWS ALB cookie)
157    */
158   Eio3.connectWs = async function (sid) {
159     baseUrl = baseUrl.slice(4); // trim leading 'http'
160     let url =
161       `ws${baseUrl}/socket.io/?EIO=3&transport=websocket&sid=${sid}`.replace(
162         "http",
163         "ws",
164       );
165
166     let cookies = await cookieStore.get(`${baseUrl}/`);
167     let ws = new WSClient(url, {
168       //agent: httpAgent,
169       //perMessageDeflate: false,
170       //@ts-ignore - type info is wrong
171       headers: Object.assign(
172         {
173           Cookie: cookies,
174         },
175         defaultHeaders,
176       ),
177     });
178
179     let promise = new Promise(function (resolve) {
180       ws.on("open", function open() {
181         if (debug) {
182           console.debug("=> Socket.io Hello ('2probe')");
183         }
184         ws.send("2probe");
185       });
186
187       ws.once("error", function (err) {
188         if (onError) {
189           onError(err);
190         } else {
191           console.error("WebSocket Error:");
192           console.error(err);
193         }
194       });
195
196       ws.once("message", function message(data) {
197         if ("3probe" === data.toString()) {
198           if (debug) {
199             console.debug("<= Socket.io Welcome ('3probe')");
200           }
201           ws.send("5"); // no idea, but necessary
202           if (debug) {
203             console.debug("=> Socket.io ACK? ('5')");
204           }
205         } else {
206           console.error("Unrecognized WebSocket Hello:");
207           console.error(data.toString());
208           // reject()
209           process.exit(1);
210         }
211         resolve(ws);
212       });
213     });
214
215     return await promise;
216   };
217
218   /** @type import('ws')? */
219   wsc._ws = null;
220
221   wsc.init = async function () {
222     let session = await Eio3.connect();
223     if (debug) {
224       console.debug("Socket.io Session:");
225       console.debug(session);
226       console.debug();
227     }
228
229     let sub = await Eio3.subscribe(session.sid, "inv");
230     if (debug) {
231       console.debug("Socket.io Subscription:");
232       console.debug(sub);
233       console.debug();
234     }
235
236     /*
237     let poll = await Eio3.poll(session.sid);
238     if (debug) {
239       console.debug("Socket.io Confirm:");
240       console.debug(poll);
241       console.debug();
242     }
243     */
244
245     let ws = await Eio3.connectWs(session.sid);
246     wsc._ws = ws;
247
248     setPing();
249     ws.on("message", _onMessage);
250     ws.once("close", _onClose);
251
252     function setPing() {
253       setTimeout(function () {
254         //ws.ping(); // standard
255         ws.send("2"); // socket.io
256         if (debug) {
257           console.debug("=> Socket.io Ping");
258         }
259       }, session.pingInterval);
260     }
261
262     /**
263      * @param {Buffer} buf
264      */
265     function _onMessage(buf) {
266       let msg = buf.toString();
267       if ("3" === msg.toString()) {
268         if (debug) {
269           console.debug("<= Socket.io Pong");
270           console.debug();
271         }
272         setPing();
273         return;
274       }
275
276       if ("42" !== msg.slice(0, 2)) {
277         console.warn("Unknown message:");
278         console.warn(msg);
279         return;
280       }
281
282       /** @type {InsightPush} */
283       let [evname, data] = JSON.parse(msg.slice(2));
284       if (onMessage) {
285         onMessage(evname, data);
286       }
287       switch (evname) {
288         case "tx":
289         /* falls through */
290         case "txlock":
291         /* falls through */
292         case "block":
293         /* falls through */
294         default:
295           // TODO put check function here
296           if (debug) {
297             console.debug(`Received '${evname}':`);
298             console.debug(data);
299             console.debug();
300           }
301       }
302     }
303
304     function _onClose() {
305       if (debug) {
306         console.debug("WebSocket Close");
307       }
308       if (onClose) {
309         onClose();
310       }
311     }
312   };
313
314   wsc.close = function () {
315     wsc._ws?.close();
316   };
317
318   return wsc;
319 };
320
321 /**
322  * @param {String} baseUrl
323  * @param {Function} find
324  */
325 Ws.listen = async function (baseUrl, find) {
326   let ws;
327   let p = new Promise(async function (resolve, reject) {
328     //@ts-ignore
329     ws = Ws.create({
330       baseUrl: baseUrl,
331       cookieStore: Cookies,
332       //debug: true,
333       onClose: resolve,
334       onError: reject,
335       onMessage:
336         /**
337          * @param {String} evname
338          * @param {InsightSocketEventData} data
339          */
340         async function (evname, data) {
341           let result;
342           try {
343             result = await find(evname, data);
344           } catch (e) {
345             reject(e);
346             return;
347           }
348
349           if (result) {
350             resolve(result);
351           }
352         },
353     });
354
355     await ws.init().catch(reject);
356   });
357   let result = await p;
358   //@ts-ignore
359   ws.close();
360   return result;
361 };
362
363 // TODO waitForVouts(baseUrl, [{ address, satoshis }])
364
365 /**
366  * @param {String} baseUrl
367  * @param {String} addr
368  * @param {Number} [amount]
369  * @param {Number} [maxTxLockWait]
370  * @returns {Promise<SocketPayment>}
371  */
372 Ws.waitForVout = async function (
373   baseUrl,
374   addr,
375   amount = 0,
376   maxTxLockWait = 3000,
377 ) {
378   // Listen for Response
379   /** @type SocketPayment */
380   let mempoolTx;
381   return await Ws.listen(baseUrl, findResponse);
382
383   /**
384    * @param {String} evname
385    * @param {InsightSocketEventData} data
386    */
387   function findResponse(evname, data) {
388     if (!["tx", "txlock"].includes(evname)) {
389       return;
390     }
391
392     let now = Date.now();
393     if (mempoolTx?.timestamp) {
394       // don't wait longer than 3s for a txlock
395       if (now - mempoolTx.timestamp > maxTxLockWait) {
396         return mempoolTx;
397       }
398     }
399
400     let result;
401     // TODO should fetch tx and match hotwallet as vin
402     data.vout.some(function (vout) {
403       if (!(addr in vout)) {
404         return false;
405       }
406
407       let duffs = vout[addr];
408       if (amount && duffs !== amount) {
409         return false;
410       }
411
412       let newTx = {
413         address: addr,
414         timestamp: now,
415         txid: data.txid,
416         satoshis: duffs,
417         txlock: data.txlock,
418       };
419
420       if ("txlock" !== evname) {
421         if (!mempoolTx) {
422           mempoolTx = newTx;
423         }
424         return false;
425       }
426
427       result = newTx;
428       return true;
429     });
430
431     return result;
432   }
433 };
434
435 /*
436 async function sleep(ms) {
437   return await new Promise(function (resolve) {
438     setTimeout(resolve, ms);
439   });
440 }
441 */