Spaces:
Runtime error
Runtime error
| ; | |
| Object.defineProperty(exports, "__esModule", { value: true }); | |
| exports.Socket = void 0; | |
| const events_1 = require("events"); | |
| const debug_1 = require("debug"); | |
| const timers_1 = require("timers"); | |
| const debug = (0, debug_1.default)("engine:socket"); | |
| class Socket extends events_1.EventEmitter { | |
| /** | |
| * Client class (abstract). | |
| * | |
| * @api private | |
| */ | |
| constructor(id, server, transport, req, protocol) { | |
| super(); | |
| this._readyState = "opening"; | |
| this.upgrading = false; | |
| this.upgraded = false; | |
| this.writeBuffer = []; | |
| this.packetsFn = []; | |
| this.sentCallbackFn = []; | |
| this.cleanupFn = []; | |
| this.id = id; | |
| this.server = server; | |
| this.request = req; | |
| this.protocol = protocol; | |
| // Cache IP since it might not be in the req later | |
| if (req) { | |
| if (req.websocket && req.websocket._socket) { | |
| this.remoteAddress = req.websocket._socket.remoteAddress; | |
| } | |
| else { | |
| this.remoteAddress = req.connection.remoteAddress; | |
| } | |
| } | |
| else { | |
| // TODO there is currently no way to get the IP address of the client when it connects with WebTransport | |
| // see https://github.com/fails-components/webtransport/issues/114 | |
| } | |
| this.pingTimeoutTimer = null; | |
| this.pingIntervalTimer = null; | |
| this.setTransport(transport); | |
| this.onOpen(); | |
| } | |
| get readyState() { | |
| return this._readyState; | |
| } | |
| set readyState(state) { | |
| debug("readyState updated from %s to %s", this._readyState, state); | |
| this._readyState = state; | |
| } | |
| /** | |
| * Called upon transport considered open. | |
| * | |
| * @api private | |
| */ | |
| onOpen() { | |
| this.readyState = "open"; | |
| // sends an `open` packet | |
| this.transport.sid = this.id; | |
| this.sendPacket("open", JSON.stringify({ | |
| sid: this.id, | |
| upgrades: this.getAvailableUpgrades(), | |
| pingInterval: this.server.opts.pingInterval, | |
| pingTimeout: this.server.opts.pingTimeout, | |
| maxPayload: this.server.opts.maxHttpBufferSize, | |
| })); | |
| if (this.server.opts.initialPacket) { | |
| this.sendPacket("message", this.server.opts.initialPacket); | |
| } | |
| this.emit("open"); | |
| if (this.protocol === 3) { | |
| // in protocol v3, the client sends a ping, and the server answers with a pong | |
| this.resetPingTimeout(this.server.opts.pingInterval + this.server.opts.pingTimeout); | |
| } | |
| else { | |
| // in protocol v4, the server sends a ping, and the client answers with a pong | |
| this.schedulePing(); | |
| } | |
| } | |
| /** | |
| * Called upon transport packet. | |
| * | |
| * @param {Object} packet | |
| * @api private | |
| */ | |
| onPacket(packet) { | |
| if ("open" !== this.readyState) { | |
| return debug("packet received with closed socket"); | |
| } | |
| // export packet event | |
| debug(`received packet ${packet.type}`); | |
| this.emit("packet", packet); | |
| // Reset ping timeout on any packet, incoming data is a good sign of | |
| // other side's liveness | |
| this.resetPingTimeout(this.server.opts.pingInterval + this.server.opts.pingTimeout); | |
| switch (packet.type) { | |
| case "ping": | |
| if (this.transport.protocol !== 3) { | |
| this.onError("invalid heartbeat direction"); | |
| return; | |
| } | |
| debug("got ping"); | |
| this.sendPacket("pong"); | |
| this.emit("heartbeat"); | |
| break; | |
| case "pong": | |
| if (this.transport.protocol === 3) { | |
| this.onError("invalid heartbeat direction"); | |
| return; | |
| } | |
| debug("got pong"); | |
| this.pingIntervalTimer.refresh(); | |
| this.emit("heartbeat"); | |
| break; | |
| case "error": | |
| this.onClose("parse error"); | |
| break; | |
| case "message": | |
| this.emit("data", packet.data); | |
| this.emit("message", packet.data); | |
| break; | |
| } | |
| } | |
| /** | |
| * Called upon transport error. | |
| * | |
| * @param {Error} err - error object | |
| * @api private | |
| */ | |
| onError(err) { | |
| debug("transport error"); | |
| this.onClose("transport error", err); | |
| } | |
| /** | |
| * Pings client every `this.pingInterval` and expects response | |
| * within `this.pingTimeout` or closes connection. | |
| * | |
| * @api private | |
| */ | |
| schedulePing() { | |
| this.pingIntervalTimer = (0, timers_1.setTimeout)(() => { | |
| debug("writing ping packet - expecting pong within %sms", this.server.opts.pingTimeout); | |
| this.sendPacket("ping"); | |
| this.resetPingTimeout(this.server.opts.pingTimeout); | |
| }, this.server.opts.pingInterval); | |
| } | |
| /** | |
| * Resets ping timeout. | |
| * | |
| * @api private | |
| */ | |
| resetPingTimeout(timeout) { | |
| (0, timers_1.clearTimeout)(this.pingTimeoutTimer); | |
| this.pingTimeoutTimer = (0, timers_1.setTimeout)(() => { | |
| if (this.readyState === "closed") | |
| return; | |
| this.onClose("ping timeout"); | |
| }, timeout); | |
| } | |
| /** | |
| * Attaches handlers for the given transport. | |
| * | |
| * @param {Transport} transport | |
| * @api private | |
| */ | |
| setTransport(transport) { | |
| const onError = this.onError.bind(this); | |
| const onPacket = this.onPacket.bind(this); | |
| const flush = this.flush.bind(this); | |
| const onClose = this.onClose.bind(this, "transport close"); | |
| this.transport = transport; | |
| this.transport.once("error", onError); | |
| this.transport.on("packet", onPacket); | |
| this.transport.on("drain", flush); | |
| this.transport.once("close", onClose); | |
| // this function will manage packet events (also message callbacks) | |
| this.setupSendCallback(); | |
| this.cleanupFn.push(function () { | |
| transport.removeListener("error", onError); | |
| transport.removeListener("packet", onPacket); | |
| transport.removeListener("drain", flush); | |
| transport.removeListener("close", onClose); | |
| }); | |
| } | |
| /** | |
| * Upgrades socket to the given transport | |
| * | |
| * @param {Transport} transport | |
| * @api private | |
| */ | |
| maybeUpgrade(transport) { | |
| debug('might upgrade socket transport from "%s" to "%s"', this.transport.name, transport.name); | |
| this.upgrading = true; | |
| // set transport upgrade timer | |
| const upgradeTimeoutTimer = (0, timers_1.setTimeout)(() => { | |
| debug("client did not complete upgrade - closing transport"); | |
| cleanup(); | |
| if ("open" === transport.readyState) { | |
| transport.close(); | |
| } | |
| }, this.server.opts.upgradeTimeout); | |
| let checkIntervalTimer; | |
| const onPacket = (packet) => { | |
| if ("ping" === packet.type && "probe" === packet.data) { | |
| debug("got probe ping packet, sending pong"); | |
| transport.send([{ type: "pong", data: "probe" }]); | |
| this.emit("upgrading", transport); | |
| clearInterval(checkIntervalTimer); | |
| checkIntervalTimer = setInterval(check, 100); | |
| } | |
| else if ("upgrade" === packet.type && this.readyState !== "closed") { | |
| debug("got upgrade packet - upgrading"); | |
| cleanup(); | |
| this.transport.discard(); | |
| this.upgraded = true; | |
| this.clearTransport(); | |
| this.setTransport(transport); | |
| this.emit("upgrade", transport); | |
| this.flush(); | |
| if (this.readyState === "closing") { | |
| transport.close(() => { | |
| this.onClose("forced close"); | |
| }); | |
| } | |
| } | |
| else { | |
| cleanup(); | |
| transport.close(); | |
| } | |
| }; | |
| // we force a polling cycle to ensure a fast upgrade | |
| const check = () => { | |
| if ("polling" === this.transport.name && this.transport.writable) { | |
| debug("writing a noop packet to polling for fast upgrade"); | |
| this.transport.send([{ type: "noop" }]); | |
| } | |
| }; | |
| const cleanup = () => { | |
| this.upgrading = false; | |
| clearInterval(checkIntervalTimer); | |
| (0, timers_1.clearTimeout)(upgradeTimeoutTimer); | |
| transport.removeListener("packet", onPacket); | |
| transport.removeListener("close", onTransportClose); | |
| transport.removeListener("error", onError); | |
| this.removeListener("close", onClose); | |
| }; | |
| const onError = (err) => { | |
| debug("client did not complete upgrade - %s", err); | |
| cleanup(); | |
| transport.close(); | |
| transport = null; | |
| }; | |
| const onTransportClose = () => { | |
| onError("transport closed"); | |
| }; | |
| const onClose = () => { | |
| onError("socket closed"); | |
| }; | |
| transport.on("packet", onPacket); | |
| transport.once("close", onTransportClose); | |
| transport.once("error", onError); | |
| this.once("close", onClose); | |
| } | |
| /** | |
| * Clears listeners and timers associated with current transport. | |
| * | |
| * @api private | |
| */ | |
| clearTransport() { | |
| let cleanup; | |
| const toCleanUp = this.cleanupFn.length; | |
| for (let i = 0; i < toCleanUp; i++) { | |
| cleanup = this.cleanupFn.shift(); | |
| cleanup(); | |
| } | |
| // silence further transport errors and prevent uncaught exceptions | |
| this.transport.on("error", function () { | |
| debug("error triggered by discarded transport"); | |
| }); | |
| // ensure transport won't stay open | |
| this.transport.close(); | |
| (0, timers_1.clearTimeout)(this.pingTimeoutTimer); | |
| } | |
| /** | |
| * Called upon transport considered closed. | |
| * Possible reasons: `ping timeout`, `client error`, `parse error`, | |
| * `transport error`, `server close`, `transport close` | |
| */ | |
| onClose(reason, description) { | |
| if ("closed" !== this.readyState) { | |
| this.readyState = "closed"; | |
| // clear timers | |
| (0, timers_1.clearTimeout)(this.pingIntervalTimer); | |
| (0, timers_1.clearTimeout)(this.pingTimeoutTimer); | |
| // clean writeBuffer in next tick, so developers can still | |
| // grab the writeBuffer on 'close' event | |
| process.nextTick(() => { | |
| this.writeBuffer = []; | |
| }); | |
| this.packetsFn = []; | |
| this.sentCallbackFn = []; | |
| this.clearTransport(); | |
| this.emit("close", reason, description); | |
| } | |
| } | |
| /** | |
| * Setup and manage send callback | |
| * | |
| * @api private | |
| */ | |
| setupSendCallback() { | |
| // the message was sent successfully, execute the callback | |
| const onDrain = () => { | |
| if (this.sentCallbackFn.length > 0) { | |
| const seqFn = this.sentCallbackFn.splice(0, 1)[0]; | |
| if ("function" === typeof seqFn) { | |
| debug("executing send callback"); | |
| seqFn(this.transport); | |
| } | |
| else if (Array.isArray(seqFn)) { | |
| debug("executing batch send callback"); | |
| const l = seqFn.length; | |
| let i = 0; | |
| for (; i < l; i++) { | |
| if ("function" === typeof seqFn[i]) { | |
| seqFn[i](this.transport); | |
| } | |
| } | |
| } | |
| } | |
| }; | |
| this.transport.on("drain", onDrain); | |
| this.cleanupFn.push(() => { | |
| this.transport.removeListener("drain", onDrain); | |
| }); | |
| } | |
| /** | |
| * Sends a message packet. | |
| * | |
| * @param {Object} data | |
| * @param {Object} options | |
| * @param {Function} callback | |
| * @return {Socket} for chaining | |
| * @api public | |
| */ | |
| send(data, options, callback) { | |
| this.sendPacket("message", data, options, callback); | |
| return this; | |
| } | |
| /** | |
| * Alias of {@link send}. | |
| * | |
| * @param data | |
| * @param options | |
| * @param callback | |
| */ | |
| write(data, options, callback) { | |
| this.sendPacket("message", data, options, callback); | |
| return this; | |
| } | |
| /** | |
| * Sends a packet. | |
| * | |
| * @param {String} type - packet type | |
| * @param {String} data | |
| * @param {Object} options | |
| * @param {Function} callback | |
| * | |
| * @api private | |
| */ | |
| sendPacket(type, data, options = {}, callback) { | |
| if ("function" === typeof options) { | |
| callback = options; | |
| options = {}; | |
| } | |
| if ("closing" !== this.readyState && "closed" !== this.readyState) { | |
| debug('sending packet "%s" (%s)', type, data); | |
| // compression is enabled by default | |
| options.compress = options.compress !== false; | |
| const packet = { | |
| type, | |
| options: options, | |
| }; | |
| if (data) | |
| packet.data = data; | |
| // exports packetCreate event | |
| this.emit("packetCreate", packet); | |
| this.writeBuffer.push(packet); | |
| // add send callback to object, if defined | |
| if (callback) | |
| this.packetsFn.push(callback); | |
| this.flush(); | |
| } | |
| } | |
| /** | |
| * Attempts to flush the packets buffer. | |
| * | |
| * @api private | |
| */ | |
| flush() { | |
| if ("closed" !== this.readyState && | |
| this.transport.writable && | |
| this.writeBuffer.length) { | |
| debug("flushing buffer to transport"); | |
| this.emit("flush", this.writeBuffer); | |
| this.server.emit("flush", this, this.writeBuffer); | |
| const wbuf = this.writeBuffer; | |
| this.writeBuffer = []; | |
| if (!this.transport.supportsFraming) { | |
| this.sentCallbackFn.push(this.packetsFn); | |
| } | |
| else { | |
| this.sentCallbackFn.push.apply(this.sentCallbackFn, this.packetsFn); | |
| } | |
| this.packetsFn = []; | |
| this.transport.send(wbuf); | |
| this.emit("drain"); | |
| this.server.emit("drain", this); | |
| } | |
| } | |
| /** | |
| * Get available upgrades for this socket. | |
| * | |
| * @api private | |
| */ | |
| getAvailableUpgrades() { | |
| const availableUpgrades = []; | |
| const allUpgrades = this.server.upgrades(this.transport.name); | |
| let i = 0; | |
| const l = allUpgrades.length; | |
| for (; i < l; ++i) { | |
| const upg = allUpgrades[i]; | |
| if (this.server.opts.transports.indexOf(upg) !== -1) { | |
| availableUpgrades.push(upg); | |
| } | |
| } | |
| return availableUpgrades; | |
| } | |
| /** | |
| * Closes the socket and underlying transport. | |
| * | |
| * @param {Boolean} discard - optional, discard the transport | |
| * @return {Socket} for chaining | |
| * @api public | |
| */ | |
| close(discard) { | |
| if ("open" !== this.readyState) | |
| return; | |
| this.readyState = "closing"; | |
| if (this.writeBuffer.length) { | |
| debug("there are %d remaining packets in the buffer, waiting for the 'drain' event", this.writeBuffer.length); | |
| this.once("drain", () => { | |
| debug("all packets have been sent, closing the transport"); | |
| this.closeTransport(discard); | |
| }); | |
| return; | |
| } | |
| debug("the buffer is empty, closing the transport right away", discard); | |
| this.closeTransport(discard); | |
| } | |
| /** | |
| * Closes the underlying transport. | |
| * | |
| * @param {Boolean} discard | |
| * @api private | |
| */ | |
| closeTransport(discard) { | |
| debug("closing the transport (discard? %s)", discard); | |
| if (discard) | |
| this.transport.discard(); | |
| this.transport.close(this.onClose.bind(this, "forced close")); | |
| } | |
| } | |
| exports.Socket = Socket; | |