Spaces:
Runtime error
Runtime error
| """WebSocket protocol versions 13 and 8.""" | |
| import asyncio | |
| import functools | |
| import json | |
| import random | |
| import re | |
| import sys | |
| import zlib | |
| from enum import IntEnum | |
| from struct import Struct | |
| from typing import ( | |
| Any, | |
| Callable, | |
| Final, | |
| List, | |
| NamedTuple, | |
| Optional, | |
| Pattern, | |
| Set, | |
| Tuple, | |
| Union, | |
| cast, | |
| ) | |
| from .base_protocol import BaseProtocol | |
| from .compression_utils import ZLibCompressor, ZLibDecompressor | |
| from .helpers import NO_EXTENSIONS, set_exception | |
| from .streams import DataQueue | |
| __all__ = ( | |
| "WS_CLOSED_MESSAGE", | |
| "WS_CLOSING_MESSAGE", | |
| "WS_KEY", | |
| "WebSocketReader", | |
| "WebSocketWriter", | |
| "WSMessage", | |
| "WebSocketError", | |
| "WSMsgType", | |
| "WSCloseCode", | |
| ) | |
| class WSCloseCode(IntEnum): | |
| OK = 1000 | |
| GOING_AWAY = 1001 | |
| PROTOCOL_ERROR = 1002 | |
| UNSUPPORTED_DATA = 1003 | |
| ABNORMAL_CLOSURE = 1006 | |
| INVALID_TEXT = 1007 | |
| POLICY_VIOLATION = 1008 | |
| MESSAGE_TOO_BIG = 1009 | |
| MANDATORY_EXTENSION = 1010 | |
| INTERNAL_ERROR = 1011 | |
| SERVICE_RESTART = 1012 | |
| TRY_AGAIN_LATER = 1013 | |
| BAD_GATEWAY = 1014 | |
| ALLOWED_CLOSE_CODES: Final[Set[int]] = {int(i) for i in WSCloseCode} | |
| # For websockets, keeping latency low is extremely important as implementations | |
| # generally expect to be able to send and receive messages quickly. We use a | |
| # larger chunk size than the default to reduce the number of executor calls | |
| # since the executor is a significant source of latency and overhead when | |
| # the chunks are small. A size of 5KiB was chosen because it is also the | |
| # same value python-zlib-ng choose to use as the threshold to release the GIL. | |
| WEBSOCKET_MAX_SYNC_CHUNK_SIZE = 5 * 1024 | |
| class WSMsgType(IntEnum): | |
| # websocket spec types | |
| CONTINUATION = 0x0 | |
| TEXT = 0x1 | |
| BINARY = 0x2 | |
| PING = 0x9 | |
| PONG = 0xA | |
| CLOSE = 0x8 | |
| # aiohttp specific types | |
| CLOSING = 0x100 | |
| CLOSED = 0x101 | |
| ERROR = 0x102 | |
| text = TEXT | |
| binary = BINARY | |
| ping = PING | |
| pong = PONG | |
| close = CLOSE | |
| closing = CLOSING | |
| closed = CLOSED | |
| error = ERROR | |
| WS_KEY: Final[bytes] = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11" | |
| UNPACK_LEN2 = Struct("!H").unpack_from | |
| UNPACK_LEN3 = Struct("!Q").unpack_from | |
| UNPACK_CLOSE_CODE = Struct("!H").unpack | |
| PACK_LEN1 = Struct("!BB").pack | |
| PACK_LEN2 = Struct("!BBH").pack | |
| PACK_LEN3 = Struct("!BBQ").pack | |
| PACK_CLOSE_CODE = Struct("!H").pack | |
| MSG_SIZE: Final[int] = 2**14 | |
| DEFAULT_LIMIT: Final[int] = 2**16 | |
| class WSMessage(NamedTuple): | |
| type: WSMsgType | |
| # To type correctly, this would need some kind of tagged union for each type. | |
| data: Any | |
| extra: Optional[str] | |
| def json(self, *, loads: Callable[[Any], Any] = json.loads) -> Any: | |
| """Return parsed JSON data. | |
| .. versionadded:: 0.22 | |
| """ | |
| return loads(self.data) | |
| WS_CLOSED_MESSAGE = WSMessage(WSMsgType.CLOSED, None, None) | |
| WS_CLOSING_MESSAGE = WSMessage(WSMsgType.CLOSING, None, None) | |
| class WebSocketError(Exception): | |
| """WebSocket protocol parser error.""" | |
| def __init__(self, code: int, message: str) -> None: | |
| self.code = code | |
| super().__init__(code, message) | |
| def __str__(self) -> str: | |
| return cast(str, self.args[1]) | |
| class WSHandshakeError(Exception): | |
| """WebSocket protocol handshake error.""" | |
| native_byteorder: Final[str] = sys.byteorder | |
| # Used by _websocket_mask_python | |
| def _xor_table() -> List[bytes]: | |
| return [bytes(a ^ b for a in range(256)) for b in range(256)] | |
| def _websocket_mask_python(mask: bytes, data: bytearray) -> None: | |
| """Websocket masking function. | |
| `mask` is a `bytes` object of length 4; `data` is a `bytearray` | |
| object of any length. The contents of `data` are masked with `mask`, | |
| as specified in section 5.3 of RFC 6455. | |
| Note that this function mutates the `data` argument. | |
| This pure-python implementation may be replaced by an optimized | |
| version when available. | |
| """ | |
| assert isinstance(data, bytearray), data | |
| assert len(mask) == 4, mask | |
| if data: | |
| _XOR_TABLE = _xor_table() | |
| a, b, c, d = (_XOR_TABLE[n] for n in mask) | |
| data[::4] = data[::4].translate(a) | |
| data[1::4] = data[1::4].translate(b) | |
| data[2::4] = data[2::4].translate(c) | |
| data[3::4] = data[3::4].translate(d) | |
| if NO_EXTENSIONS: # pragma: no cover | |
| _websocket_mask = _websocket_mask_python | |
| else: | |
| try: | |
| from ._websocket import _websocket_mask_cython # type: ignore[import-not-found] | |
| _websocket_mask = _websocket_mask_cython | |
| except ImportError: # pragma: no cover | |
| _websocket_mask = _websocket_mask_python | |
| _WS_DEFLATE_TRAILING: Final[bytes] = bytes([0x00, 0x00, 0xFF, 0xFF]) | |
| _WS_EXT_RE: Final[Pattern[str]] = re.compile( | |
| r"^(?:;\s*(?:" | |
| r"(server_no_context_takeover)|" | |
| r"(client_no_context_takeover)|" | |
| r"(server_max_window_bits(?:=(\d+))?)|" | |
| r"(client_max_window_bits(?:=(\d+))?)))*$" | |
| ) | |
| _WS_EXT_RE_SPLIT: Final[Pattern[str]] = re.compile(r"permessage-deflate([^,]+)?") | |
| def ws_ext_parse(extstr: Optional[str], isserver: bool = False) -> Tuple[int, bool]: | |
| if not extstr: | |
| return 0, False | |
| compress = 0 | |
| notakeover = False | |
| for ext in _WS_EXT_RE_SPLIT.finditer(extstr): | |
| defext = ext.group(1) | |
| # Return compress = 15 when get `permessage-deflate` | |
| if not defext: | |
| compress = 15 | |
| break | |
| match = _WS_EXT_RE.match(defext) | |
| if match: | |
| compress = 15 | |
| if isserver: | |
| # Server never fail to detect compress handshake. | |
| # Server does not need to send max wbit to client | |
| if match.group(4): | |
| compress = int(match.group(4)) | |
| # Group3 must match if group4 matches | |
| # Compress wbit 8 does not support in zlib | |
| # If compress level not support, | |
| # CONTINUE to next extension | |
| if compress > 15 or compress < 9: | |
| compress = 0 | |
| continue | |
| if match.group(1): | |
| notakeover = True | |
| # Ignore regex group 5 & 6 for client_max_window_bits | |
| break | |
| else: | |
| if match.group(6): | |
| compress = int(match.group(6)) | |
| # Group5 must match if group6 matches | |
| # Compress wbit 8 does not support in zlib | |
| # If compress level not support, | |
| # FAIL the parse progress | |
| if compress > 15 or compress < 9: | |
| raise WSHandshakeError("Invalid window size") | |
| if match.group(2): | |
| notakeover = True | |
| # Ignore regex group 5 & 6 for client_max_window_bits | |
| break | |
| # Return Fail if client side and not match | |
| elif not isserver: | |
| raise WSHandshakeError("Extension for deflate not supported" + ext.group(1)) | |
| return compress, notakeover | |
| def ws_ext_gen( | |
| compress: int = 15, isserver: bool = False, server_notakeover: bool = False | |
| ) -> str: | |
| # client_notakeover=False not used for server | |
| # compress wbit 8 does not support in zlib | |
| if compress < 9 or compress > 15: | |
| raise ValueError( | |
| "Compress wbits must between 9 and 15, " "zlib does not support wbits=8" | |
| ) | |
| enabledext = ["permessage-deflate"] | |
| if not isserver: | |
| enabledext.append("client_max_window_bits") | |
| if compress < 15: | |
| enabledext.append("server_max_window_bits=" + str(compress)) | |
| if server_notakeover: | |
| enabledext.append("server_no_context_takeover") | |
| # if client_notakeover: | |
| # enabledext.append('client_no_context_takeover') | |
| return "; ".join(enabledext) | |
| class WSParserState(IntEnum): | |
| READ_HEADER = 1 | |
| READ_PAYLOAD_LENGTH = 2 | |
| READ_PAYLOAD_MASK = 3 | |
| READ_PAYLOAD = 4 | |
| class WebSocketReader: | |
| def __init__( | |
| self, queue: DataQueue[WSMessage], max_msg_size: int, compress: bool = True | |
| ) -> None: | |
| self.queue = queue | |
| self._max_msg_size = max_msg_size | |
| self._exc: Optional[BaseException] = None | |
| self._partial = bytearray() | |
| self._state = WSParserState.READ_HEADER | |
| self._opcode: Optional[int] = None | |
| self._frame_fin = False | |
| self._frame_opcode: Optional[int] = None | |
| self._frame_payload = bytearray() | |
| self._tail = b"" | |
| self._has_mask = False | |
| self._frame_mask: Optional[bytes] = None | |
| self._payload_length = 0 | |
| self._payload_length_flag = 0 | |
| self._compressed: Optional[bool] = None | |
| self._decompressobj: Optional[ZLibDecompressor] = None | |
| self._compress = compress | |
| def feed_eof(self) -> None: | |
| self.queue.feed_eof() | |
| def feed_data(self, data: bytes) -> Tuple[bool, bytes]: | |
| if self._exc: | |
| return True, data | |
| try: | |
| return self._feed_data(data) | |
| except Exception as exc: | |
| self._exc = exc | |
| set_exception(self.queue, exc) | |
| return True, b"" | |
| def _feed_data(self, data: bytes) -> Tuple[bool, bytes]: | |
| for fin, opcode, payload, compressed in self.parse_frame(data): | |
| if compressed and not self._decompressobj: | |
| self._decompressobj = ZLibDecompressor(suppress_deflate_header=True) | |
| if opcode == WSMsgType.CLOSE: | |
| if len(payload) >= 2: | |
| close_code = UNPACK_CLOSE_CODE(payload[:2])[0] | |
| if close_code < 3000 and close_code not in ALLOWED_CLOSE_CODES: | |
| raise WebSocketError( | |
| WSCloseCode.PROTOCOL_ERROR, | |
| f"Invalid close code: {close_code}", | |
| ) | |
| try: | |
| close_message = payload[2:].decode("utf-8") | |
| except UnicodeDecodeError as exc: | |
| raise WebSocketError( | |
| WSCloseCode.INVALID_TEXT, "Invalid UTF-8 text message" | |
| ) from exc | |
| msg = WSMessage(WSMsgType.CLOSE, close_code, close_message) | |
| elif payload: | |
| raise WebSocketError( | |
| WSCloseCode.PROTOCOL_ERROR, | |
| f"Invalid close frame: {fin} {opcode} {payload!r}", | |
| ) | |
| else: | |
| msg = WSMessage(WSMsgType.CLOSE, 0, "") | |
| self.queue.feed_data(msg, 0) | |
| elif opcode == WSMsgType.PING: | |
| self.queue.feed_data( | |
| WSMessage(WSMsgType.PING, payload, ""), len(payload) | |
| ) | |
| elif opcode == WSMsgType.PONG: | |
| self.queue.feed_data( | |
| WSMessage(WSMsgType.PONG, payload, ""), len(payload) | |
| ) | |
| elif ( | |
| opcode not in (WSMsgType.TEXT, WSMsgType.BINARY) | |
| and self._opcode is None | |
| ): | |
| raise WebSocketError( | |
| WSCloseCode.PROTOCOL_ERROR, f"Unexpected opcode={opcode!r}" | |
| ) | |
| else: | |
| # load text/binary | |
| if not fin: | |
| # got partial frame payload | |
| if opcode != WSMsgType.CONTINUATION: | |
| self._opcode = opcode | |
| self._partial.extend(payload) | |
| if self._max_msg_size and len(self._partial) >= self._max_msg_size: | |
| raise WebSocketError( | |
| WSCloseCode.MESSAGE_TOO_BIG, | |
| "Message size {} exceeds limit {}".format( | |
| len(self._partial), self._max_msg_size | |
| ), | |
| ) | |
| else: | |
| # previous frame was non finished | |
| # we should get continuation opcode | |
| if self._partial: | |
| if opcode != WSMsgType.CONTINUATION: | |
| raise WebSocketError( | |
| WSCloseCode.PROTOCOL_ERROR, | |
| "The opcode in non-fin frame is expected " | |
| "to be zero, got {!r}".format(opcode), | |
| ) | |
| if opcode == WSMsgType.CONTINUATION: | |
| assert self._opcode is not None | |
| opcode = self._opcode | |
| self._opcode = None | |
| self._partial.extend(payload) | |
| if self._max_msg_size and len(self._partial) >= self._max_msg_size: | |
| raise WebSocketError( | |
| WSCloseCode.MESSAGE_TOO_BIG, | |
| "Message size {} exceeds limit {}".format( | |
| len(self._partial), self._max_msg_size | |
| ), | |
| ) | |
| # Decompress process must to be done after all packets | |
| # received. | |
| if compressed: | |
| assert self._decompressobj is not None | |
| self._partial.extend(_WS_DEFLATE_TRAILING) | |
| payload_merged = self._decompressobj.decompress_sync( | |
| self._partial, self._max_msg_size | |
| ) | |
| if self._decompressobj.unconsumed_tail: | |
| left = len(self._decompressobj.unconsumed_tail) | |
| raise WebSocketError( | |
| WSCloseCode.MESSAGE_TOO_BIG, | |
| "Decompressed message size {} exceeds limit {}".format( | |
| self._max_msg_size + left, self._max_msg_size | |
| ), | |
| ) | |
| else: | |
| payload_merged = bytes(self._partial) | |
| self._partial.clear() | |
| if opcode == WSMsgType.TEXT: | |
| try: | |
| text = payload_merged.decode("utf-8") | |
| self.queue.feed_data( | |
| WSMessage(WSMsgType.TEXT, text, ""), len(text) | |
| ) | |
| except UnicodeDecodeError as exc: | |
| raise WebSocketError( | |
| WSCloseCode.INVALID_TEXT, "Invalid UTF-8 text message" | |
| ) from exc | |
| else: | |
| self.queue.feed_data( | |
| WSMessage(WSMsgType.BINARY, payload_merged, ""), | |
| len(payload_merged), | |
| ) | |
| return False, b"" | |
| def parse_frame( | |
| self, buf: bytes | |
| ) -> List[Tuple[bool, Optional[int], bytearray, Optional[bool]]]: | |
| """Return the next frame from the socket.""" | |
| frames = [] | |
| if self._tail: | |
| buf, self._tail = self._tail + buf, b"" | |
| start_pos = 0 | |
| buf_length = len(buf) | |
| while True: | |
| # read header | |
| if self._state == WSParserState.READ_HEADER: | |
| if buf_length - start_pos >= 2: | |
| data = buf[start_pos : start_pos + 2] | |
| start_pos += 2 | |
| first_byte, second_byte = data | |
| fin = (first_byte >> 7) & 1 | |
| rsv1 = (first_byte >> 6) & 1 | |
| rsv2 = (first_byte >> 5) & 1 | |
| rsv3 = (first_byte >> 4) & 1 | |
| opcode = first_byte & 0xF | |
| # frame-fin = %x0 ; more frames of this message follow | |
| # / %x1 ; final frame of this message | |
| # frame-rsv1 = %x0 ; | |
| # 1 bit, MUST be 0 unless negotiated otherwise | |
| # frame-rsv2 = %x0 ; | |
| # 1 bit, MUST be 0 unless negotiated otherwise | |
| # frame-rsv3 = %x0 ; | |
| # 1 bit, MUST be 0 unless negotiated otherwise | |
| # | |
| # Remove rsv1 from this test for deflate development | |
| if rsv2 or rsv3 or (rsv1 and not self._compress): | |
| raise WebSocketError( | |
| WSCloseCode.PROTOCOL_ERROR, | |
| "Received frame with non-zero reserved bits", | |
| ) | |
| if opcode > 0x7 and fin == 0: | |
| raise WebSocketError( | |
| WSCloseCode.PROTOCOL_ERROR, | |
| "Received fragmented control frame", | |
| ) | |
| has_mask = (second_byte >> 7) & 1 | |
| length = second_byte & 0x7F | |
| # Control frames MUST have a payload | |
| # length of 125 bytes or less | |
| if opcode > 0x7 and length > 125: | |
| raise WebSocketError( | |
| WSCloseCode.PROTOCOL_ERROR, | |
| "Control frame payload cannot be " "larger than 125 bytes", | |
| ) | |
| # Set compress status if last package is FIN | |
| # OR set compress status if this is first fragment | |
| # Raise error if not first fragment with rsv1 = 0x1 | |
| if self._frame_fin or self._compressed is None: | |
| self._compressed = True if rsv1 else False | |
| elif rsv1: | |
| raise WebSocketError( | |
| WSCloseCode.PROTOCOL_ERROR, | |
| "Received frame with non-zero reserved bits", | |
| ) | |
| self._frame_fin = bool(fin) | |
| self._frame_opcode = opcode | |
| self._has_mask = bool(has_mask) | |
| self._payload_length_flag = length | |
| self._state = WSParserState.READ_PAYLOAD_LENGTH | |
| else: | |
| break | |
| # read payload length | |
| if self._state == WSParserState.READ_PAYLOAD_LENGTH: | |
| length = self._payload_length_flag | |
| if length == 126: | |
| if buf_length - start_pos >= 2: | |
| data = buf[start_pos : start_pos + 2] | |
| start_pos += 2 | |
| length = UNPACK_LEN2(data)[0] | |
| self._payload_length = length | |
| self._state = ( | |
| WSParserState.READ_PAYLOAD_MASK | |
| if self._has_mask | |
| else WSParserState.READ_PAYLOAD | |
| ) | |
| else: | |
| break | |
| elif length > 126: | |
| if buf_length - start_pos >= 8: | |
| data = buf[start_pos : start_pos + 8] | |
| start_pos += 8 | |
| length = UNPACK_LEN3(data)[0] | |
| self._payload_length = length | |
| self._state = ( | |
| WSParserState.READ_PAYLOAD_MASK | |
| if self._has_mask | |
| else WSParserState.READ_PAYLOAD | |
| ) | |
| else: | |
| break | |
| else: | |
| self._payload_length = length | |
| self._state = ( | |
| WSParserState.READ_PAYLOAD_MASK | |
| if self._has_mask | |
| else WSParserState.READ_PAYLOAD | |
| ) | |
| # read payload mask | |
| if self._state == WSParserState.READ_PAYLOAD_MASK: | |
| if buf_length - start_pos >= 4: | |
| self._frame_mask = buf[start_pos : start_pos + 4] | |
| start_pos += 4 | |
| self._state = WSParserState.READ_PAYLOAD | |
| else: | |
| break | |
| if self._state == WSParserState.READ_PAYLOAD: | |
| length = self._payload_length | |
| payload = self._frame_payload | |
| chunk_len = buf_length - start_pos | |
| if length >= chunk_len: | |
| self._payload_length = length - chunk_len | |
| payload.extend(buf[start_pos:]) | |
| start_pos = buf_length | |
| else: | |
| self._payload_length = 0 | |
| payload.extend(buf[start_pos : start_pos + length]) | |
| start_pos = start_pos + length | |
| if self._payload_length == 0: | |
| if self._has_mask: | |
| assert self._frame_mask is not None | |
| _websocket_mask(self._frame_mask, payload) | |
| frames.append( | |
| (self._frame_fin, self._frame_opcode, payload, self._compressed) | |
| ) | |
| self._frame_payload = bytearray() | |
| self._state = WSParserState.READ_HEADER | |
| else: | |
| break | |
| self._tail = buf[start_pos:] | |
| return frames | |
| class WebSocketWriter: | |
| def __init__( | |
| self, | |
| protocol: BaseProtocol, | |
| transport: asyncio.Transport, | |
| *, | |
| use_mask: bool = False, | |
| limit: int = DEFAULT_LIMIT, | |
| random: random.Random = random.Random(), | |
| compress: int = 0, | |
| notakeover: bool = False, | |
| ) -> None: | |
| self.protocol = protocol | |
| self.transport = transport | |
| self.use_mask = use_mask | |
| self.randrange = random.randrange | |
| self.compress = compress | |
| self.notakeover = notakeover | |
| self._closing = False | |
| self._limit = limit | |
| self._output_size = 0 | |
| self._compressobj: Any = None # actually compressobj | |
| async def _send_frame( | |
| self, message: bytes, opcode: int, compress: Optional[int] = None | |
| ) -> None: | |
| """Send a frame over the websocket with message as its payload.""" | |
| if self._closing and not (opcode & WSMsgType.CLOSE): | |
| raise ConnectionResetError("Cannot write to closing transport") | |
| rsv = 0 | |
| # Only compress larger packets (disabled) | |
| # Does small packet needs to be compressed? | |
| # if self.compress and opcode < 8 and len(message) > 124: | |
| if (compress or self.compress) and opcode < 8: | |
| if compress: | |
| # Do not set self._compress if compressing is for this frame | |
| compressobj = self._make_compress_obj(compress) | |
| else: # self.compress | |
| if not self._compressobj: | |
| self._compressobj = self._make_compress_obj(self.compress) | |
| compressobj = self._compressobj | |
| message = await compressobj.compress(message) | |
| # Its critical that we do not return control to the event | |
| # loop until we have finished sending all the compressed | |
| # data. Otherwise we could end up mixing compressed frames | |
| # if there are multiple coroutines compressing data. | |
| message += compressobj.flush( | |
| zlib.Z_FULL_FLUSH if self.notakeover else zlib.Z_SYNC_FLUSH | |
| ) | |
| if message.endswith(_WS_DEFLATE_TRAILING): | |
| message = message[:-4] | |
| rsv = rsv | 0x40 | |
| msg_length = len(message) | |
| use_mask = self.use_mask | |
| if use_mask: | |
| mask_bit = 0x80 | |
| else: | |
| mask_bit = 0 | |
| if msg_length < 126: | |
| header = PACK_LEN1(0x80 | rsv | opcode, msg_length | mask_bit) | |
| elif msg_length < (1 << 16): | |
| header = PACK_LEN2(0x80 | rsv | opcode, 126 | mask_bit, msg_length) | |
| else: | |
| header = PACK_LEN3(0x80 | rsv | opcode, 127 | mask_bit, msg_length) | |
| if use_mask: | |
| mask_int = self.randrange(0, 0xFFFFFFFF) | |
| mask = mask_int.to_bytes(4, "big") | |
| message = bytearray(message) | |
| _websocket_mask(mask, message) | |
| self._write(header + mask + message) | |
| self._output_size += len(header) + len(mask) + msg_length | |
| else: | |
| if msg_length > MSG_SIZE: | |
| self._write(header) | |
| self._write(message) | |
| else: | |
| self._write(header + message) | |
| self._output_size += len(header) + msg_length | |
| # It is safe to return control to the event loop when using compression | |
| # after this point as we have already sent or buffered all the data. | |
| if self._output_size > self._limit: | |
| self._output_size = 0 | |
| await self.protocol._drain_helper() | |
| def _make_compress_obj(self, compress: int) -> ZLibCompressor: | |
| return ZLibCompressor( | |
| level=zlib.Z_BEST_SPEED, | |
| wbits=-compress, | |
| max_sync_chunk_size=WEBSOCKET_MAX_SYNC_CHUNK_SIZE, | |
| ) | |
| def _write(self, data: bytes) -> None: | |
| if self.transport is None or self.transport.is_closing(): | |
| raise ConnectionResetError("Cannot write to closing transport") | |
| self.transport.write(data) | |
| async def pong(self, message: Union[bytes, str] = b"") -> None: | |
| """Send pong message.""" | |
| if isinstance(message, str): | |
| message = message.encode("utf-8") | |
| await self._send_frame(message, WSMsgType.PONG) | |
| async def ping(self, message: Union[bytes, str] = b"") -> None: | |
| """Send ping message.""" | |
| if isinstance(message, str): | |
| message = message.encode("utf-8") | |
| await self._send_frame(message, WSMsgType.PING) | |
| async def send( | |
| self, | |
| message: Union[str, bytes], | |
| binary: bool = False, | |
| compress: Optional[int] = None, | |
| ) -> None: | |
| """Send a frame over the websocket with message as its payload.""" | |
| if isinstance(message, str): | |
| message = message.encode("utf-8") | |
| if binary: | |
| await self._send_frame(message, WSMsgType.BINARY, compress) | |
| else: | |
| await self._send_frame(message, WSMsgType.TEXT, compress) | |
| async def close(self, code: int = 1000, message: Union[bytes, str] = b"") -> None: | |
| """Close the websocket, sending the specified code and message.""" | |
| if isinstance(message, str): | |
| message = message.encode("utf-8") | |
| try: | |
| await self._send_frame( | |
| PACK_CLOSE_CODE(code) + message, opcode=WSMsgType.CLOSE | |
| ) | |
| finally: | |
| self._closing = True | |