aio-libs / aiohttp
1
"""WebSocket protocol versions 13 and 8."""
2

3 10
import asyncio
4 10
import collections
5 10
import json
6 10
import random
7 10
import re
8 10
import sys
9 10
import zlib
10 10
from enum import IntEnum
11 10
from struct import Struct
12 10
from typing import Any, Callable, List, Optional, Pattern, Set, Tuple, Union, cast
13

14 10
from typing_extensions import Final
15

16 10
from .base_protocol import BaseProtocol
17 10
from .helpers import NO_EXTENSIONS
18 10
from .streams import DataQueue
19

20 10
__all__ = (
21
    "WS_CLOSED_MESSAGE",
22
    "WS_CLOSING_MESSAGE",
23
    "WS_KEY",
24
    "WebSocketReader",
25
    "WebSocketWriter",
26
    "WSMessage",
27
    "WebSocketError",
28
    "WSMsgType",
29
    "WSCloseCode",
30
)
31

32

33 10
class WSCloseCode(IntEnum):
34 10
    OK = 1000
35 10
    GOING_AWAY = 1001
36 10
    PROTOCOL_ERROR = 1002
37 10
    UNSUPPORTED_DATA = 1003
38 10
    ABNORMAL_CLOSURE = 1006
39 10
    INVALID_TEXT = 1007
40 10
    POLICY_VIOLATION = 1008
41 10
    MESSAGE_TOO_BIG = 1009
42 10
    MANDATORY_EXTENSION = 1010
43 10
    INTERNAL_ERROR = 1011
44 10
    SERVICE_RESTART = 1012
45 10
    TRY_AGAIN_LATER = 1013
46 10
    BAD_GATEWAY = 1014
47

48

49 10
ALLOWED_CLOSE_CODES: Final[Set[int]] = {int(i) for i in WSCloseCode}
50

51

52 10
class WSMsgType(IntEnum):
53
    # websocket spec types
54 10
    CONTINUATION = 0x0
55 10
    TEXT = 0x1
56 10
    BINARY = 0x2
57 10
    PING = 0x9
58 10
    PONG = 0xA
59 10
    CLOSE = 0x8
60

61
    # aiohttp specific types
62 10
    CLOSING = 0x100
63 10
    CLOSED = 0x101
64 10
    ERROR = 0x102
65

66

67 10
WS_KEY: Final[bytes] = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
68

69

70 10
UNPACK_LEN2 = Struct("!H").unpack_from
71 10
UNPACK_LEN3 = Struct("!Q").unpack_from
72 10
UNPACK_CLOSE_CODE = Struct("!H").unpack
73 10
PACK_LEN1 = Struct("!BB").pack
74 10
PACK_LEN2 = Struct("!BBH").pack
75 10
PACK_LEN3 = Struct("!BBQ").pack
76 10
PACK_CLOSE_CODE = Struct("!H").pack
77 10
MSG_SIZE: Final[int] = 2 ** 14
78 10
DEFAULT_LIMIT: Final[int] = 2 ** 16
79

80

81 10
_WSMessageBase = collections.namedtuple("_WSMessageBase", ["type", "data", "extra"])
82

83

84 10
class WSMessage(_WSMessageBase):
85 10
    def json(self, *, loads: Callable[[Any], Any] = json.loads) -> Any:
86
        """Return parsed JSON data.
87

88
        .. versionadded:: 0.22
89
        """
90 10
        return loads(self.data)
91

92

93 10
WS_CLOSED_MESSAGE = WSMessage(WSMsgType.CLOSED, None, None)
94 10
WS_CLOSING_MESSAGE = WSMessage(WSMsgType.CLOSING, None, None)
95

96

97 10
class WebSocketError(Exception):
98
    """WebSocket protocol parser error."""
99

100 10
    def __init__(self, code: int, message: str) -> None:
101 10
        self.code = code
102 10
        super().__init__(code, message)
103

104 10
    def __str__(self) -> str:
105 10
        return cast(str, self.args[1])
106

107

108 10
class WSHandshakeError(Exception):
109
    """WebSocket protocol handshake error."""
110

111

112 10
native_byteorder: Final[str] = sys.byteorder
113

114

115
# Used by _websocket_mask_python
116 10
_XOR_TABLE: Final[List[bytes]] = [bytes(a ^ b for a in range(256)) for b in range(256)]
117

118

119 10
def _websocket_mask_python(mask: bytes, data: bytearray) -> None:
120
    """Websocket masking function.
121

122
    `mask` is a `bytes` object of length 4; `data` is a `bytearray`
123
    object of any length. The contents of `data` are masked with `mask`,
124
    as specified in section 5.3 of RFC 6455.
125

126
    Note that this function mutates the `data` argument.
127

128
    This pure-python implementation may be replaced by an optimized
129
    version when available.
130

131
    """
132 10
    assert isinstance(data, bytearray), data
133 10
    assert len(mask) == 4, mask
134

135 10
    if data:
136 10
        a, b, c, d = (_XOR_TABLE[n] for n in mask)
137 10
        data[::4] = data[::4].translate(a)
138 10
        data[1::4] = data[1::4].translate(b)
139 10
        data[2::4] = data[2::4].translate(c)
140 10
        data[3::4] = data[3::4].translate(d)
141

142

143
if NO_EXTENSIONS:  # pragma: no cover
144
    _websocket_mask = _websocket_mask_python
145
else:
146 7
    try:
147 7
        from ._websocket import _websocket_mask_cython  # type: ignore[import]
148

149 7
        _websocket_mask = _websocket_mask_cython
150
    except ImportError:  # pragma: no cover
151
        _websocket_mask = _websocket_mask_python
152

153 10
_WS_DEFLATE_TRAILING: Final[bytes] = bytes([0x00, 0x00, 0xFF, 0xFF])
154

155

156 10
_WS_EXT_RE: Final[Pattern[str]] = re.compile(
157
    r"^(?:;\s*(?:"
158
    r"(server_no_context_takeover)|"
159
    r"(client_no_context_takeover)|"
160
    r"(server_max_window_bits(?:=(\d+))?)|"
161
    r"(client_max_window_bits(?:=(\d+))?)))*$"
162
)
163

164 10
_WS_EXT_RE_SPLIT: Final[Pattern[str]] = re.compile(r"permessage-deflate([^,]+)?")
165

166

167 10
def ws_ext_parse(extstr: Optional[str], isserver: bool = False) -> Tuple[int, bool]:
168 10
    if not extstr:
169 10
        return 0, False
170

171 10
    compress = 0
172 10
    notakeover = False
173 10
    for ext in _WS_EXT_RE_SPLIT.finditer(extstr):
174 10
        defext = ext.group(1)
175
        # Return compress = 15 when get `permessage-deflate`
176 10
        if not defext:
177 10
            compress = 15
178 10
            break
179 10
        match = _WS_EXT_RE.match(defext)
180 10
        if match:
181 10
            compress = 15
182 10
            if isserver:
183
                # Server never fail to detect compress handshake.
184
                # Server does not need to send max wbit to client
185 10
                if match.group(4):
186 10
                    compress = int(match.group(4))
187
                    # Group3 must match if group4 matches
188
                    # Compress wbit 8 does not support in zlib
189
                    # If compress level not support,
190
                    # CONTINUE to next extension
191 10
                    if compress > 15 or compress < 9:
192 10
                        compress = 0
193 10
                        continue
194 10
                if match.group(1):
195 10
                    notakeover = True
196
                # Ignore regex group 5 & 6 for client_max_window_bits
197 10
                break
198
            else:
199 10
                if match.group(6):
200 10
                    compress = int(match.group(6))
201
                    # Group5 must match if group6 matches
202
                    # Compress wbit 8 does not support in zlib
203
                    # If compress level not support,
204
                    # FAIL the parse progress
205 10
                    if compress > 15 or compress < 9:
206 10
                        raise WSHandshakeError("Invalid window size")
207 10
                if match.group(2):
208 10
                    notakeover = True
209
                # Ignore regex group 5 & 6 for client_max_window_bits
210 10
                break
211
        # Return Fail if client side and not match
212 10
        elif not isserver:
213 10
            raise WSHandshakeError("Extension for deflate not supported" + ext.group(1))
214

215 10
    return compress, notakeover
216

217

218 10
def ws_ext_gen(
219
    compress: int = 15, isserver: bool = False, server_notakeover: bool = False
220
) -> str:
221
    # client_notakeover=False not used for server
222
    # compress wbit 8 does not support in zlib
223 10
    if compress < 9 or compress > 15:
224 10
        raise ValueError(
225
            "Compress wbits must between 9 and 15, " "zlib does not support wbits=8"
226
        )
227 10
    enabledext = ["permessage-deflate"]
228 10
    if not isserver:
229 10
        enabledext.append("client_max_window_bits")
230

231 10
    if compress < 15:
232 10
        enabledext.append("server_max_window_bits=" + str(compress))
233 10
    if server_notakeover:
234 10
        enabledext.append("server_no_context_takeover")
235
    # if client_notakeover:
236
    #     enabledext.append('client_no_context_takeover')
237 10
    return "; ".join(enabledext)
238

239

240 10
class WSParserState(IntEnum):
241 10
    READ_HEADER = 1
242 10
    READ_PAYLOAD_LENGTH = 2
243 10
    READ_PAYLOAD_MASK = 3
244 10
    READ_PAYLOAD = 4
245

246

247 10
class WebSocketReader:
248 10
    def __init__(
249
        self, queue: DataQueue[WSMessage], max_msg_size: int, compress: bool = True
250
    ) -> None:
251 10
        self.queue = queue
252 10
        self._max_msg_size = max_msg_size
253

254 10
        self._exc = None  # type: Optional[BaseException]
255 10
        self._partial = bytearray()
256 10
        self._state = WSParserState.READ_HEADER
257

258 10
        self._opcode = None  # type: Optional[int]
259 10
        self._frame_fin = False
260 10
        self._frame_opcode = None  # type: Optional[int]
261 10
        self._frame_payload = bytearray()
262

263 10
        self._tail = b""
264 10
        self._has_mask = False
265 10
        self._frame_mask = None  # type: Optional[bytes]
266 10
        self._payload_length = 0
267 10
        self._payload_length_flag = 0
268 10
        self._compressed = None  # type: Optional[bool]
269 10
        self._decompressobj = None  # type: Any  # zlib.decompressobj actually
270 10
        self._compress = compress
271

272 10
    def feed_eof(self) -> None:
273 10
        self.queue.feed_eof()
274

275 10
    def feed_data(self, data: bytes) -> Tuple[bool, bytes]:
276 10
        if self._exc:
277 0
            return True, data
278

279 10
        try:
280 10
            return self._feed_data(data)
281 10
        except Exception as exc:
282 10
            self._exc = exc
283 10
            self.queue.set_exception(exc)
284 10
            return True, b""
285

286 10
    def _feed_data(self, data: bytes) -> Tuple[bool, bytes]:
287 10
        for fin, opcode, payload, compressed in self.parse_frame(data):
288 10
            if compressed and not self._decompressobj:
289 10
                self._decompressobj = zlib.decompressobj(wbits=-zlib.MAX_WBITS)
290 10
            if opcode == WSMsgType.CLOSE:
291 10
                if len(payload) >= 2:
292 10
                    close_code = UNPACK_CLOSE_CODE(payload[:2])[0]
293 10
                    if close_code < 3000 and close_code not in ALLOWED_CLOSE_CODES:
294 10
                        raise WebSocketError(
295
                            WSCloseCode.PROTOCOL_ERROR,
296
                            f"Invalid close code: {close_code}",
297
                        )
298 10
                    try:
299 10
                        close_message = payload[2:].decode("utf-8")
300 10
                    except UnicodeDecodeError as exc:
301 10
                        raise WebSocketError(
302
                            WSCloseCode.INVALID_TEXT, "Invalid UTF-8 text message"
303
                        ) from exc
304 10
                    msg = WSMessage(WSMsgType.CLOSE, close_code, close_message)
305 10
                elif payload:
306 10
                    raise WebSocketError(
307
                        WSCloseCode.PROTOCOL_ERROR,
308
                        f"Invalid close frame: {fin} {opcode} {payload!r}",
309
                    )
310
                else:
311 10
                    msg = WSMessage(WSMsgType.CLOSE, 0, "")
312

313 10
                self.queue.feed_data(msg, 0)
314

315 10
            elif opcode == WSMsgType.PING:
316 10
                self.queue.feed_data(
317
                    WSMessage(WSMsgType.PING, payload, ""), len(payload)
318
                )
319

320 10
            elif opcode == WSMsgType.PONG:
321 10
                self.queue.feed_data(
322
                    WSMessage(WSMsgType.PONG, payload, ""), len(payload)
323
                )
324

325 10
            elif (
326
                opcode not in (WSMsgType.TEXT, WSMsgType.BINARY)
327
                and self._opcode is None
328
            ):
329 10
                raise WebSocketError(
330
                    WSCloseCode.PROTOCOL_ERROR, f"Unexpected opcode={opcode!r}"
331
                )
332
            else:
333
                # load text/binary
334 10
                if not fin:
335
                    # got partial frame payload
336 10
                    if opcode != WSMsgType.CONTINUATION:
337 10
                        self._opcode = opcode
338 10
                    self._partial.extend(payload)
339 10
                    if self._max_msg_size and len(self._partial) >= self._max_msg_size:
340 10
                        raise WebSocketError(
341
                            WSCloseCode.MESSAGE_TOO_BIG,
342
                            "Message size {} exceeds limit {}".format(
343
                                len(self._partial), self._max_msg_size
344
                            ),
345
                        )
346
                else:
347
                    # previous frame was non finished
348
                    # we should get continuation opcode
349 10
                    if self._partial:
350 10
                        if opcode != WSMsgType.CONTINUATION:
351 10
                            raise WebSocketError(
352
                                WSCloseCode.PROTOCOL_ERROR,
353
                                "The opcode in non-fin frame is expected "
354
                                "to be zero, got {!r}".format(opcode),
355
                            )
356

357 10
                    if opcode == WSMsgType.CONTINUATION:
358 10
                        assert self._opcode is not None
359 10
                        opcode = self._opcode
360 10
                        self._opcode = None
361

362 10
                    self._partial.extend(payload)
363 10
                    if self._max_msg_size and len(self._partial) >= self._max_msg_size:
364 10
                        raise WebSocketError(
365
                            WSCloseCode.MESSAGE_TOO_BIG,
366
                            "Message size {} exceeds limit {}".format(
367
                                len(self._partial), self._max_msg_size
368
                            ),
369
                        )
370

371
                    # Decompress process must to be done after all packets
372
                    # received.
373 10
                    if compressed:
374 10
                        self._partial.extend(_WS_DEFLATE_TRAILING)
375 10
                        payload_merged = self._decompressobj.decompress(
376
                            self._partial, self._max_msg_size
377
                        )
378 10
                        if self._decompressobj.unconsumed_tail:
379 10
                            left = len(self._decompressobj.unconsumed_tail)
380 10
                            raise WebSocketError(
381
                                WSCloseCode.MESSAGE_TOO_BIG,
382
                                "Decompressed message size {} exceeds limit {}".format(
383
                                    self._max_msg_size + left, self._max_msg_size
384
                                ),
385
                            )
386
                    else:
387 10
                        payload_merged = bytes(self._partial)
388

389 10
                    self._partial.clear()
390

391 10
                    if opcode == WSMsgType.TEXT:
392 10
                        try:
393 10
                            text = payload_merged.decode("utf-8")
394 10
                            self.queue.feed_data(
395
                                WSMessage(WSMsgType.TEXT, text, ""), len(text)
396
                            )
397 10
                        except UnicodeDecodeError as exc:
398 10
                            raise WebSocketError(
399
                                WSCloseCode.INVALID_TEXT, "Invalid UTF-8 text message"
400
                            ) from exc
401
                    else:
402 10
                        self.queue.feed_data(
403
                            WSMessage(WSMsgType.BINARY, payload_merged, ""),
404
                            len(payload_merged),
405
                        )
406

407 10
        return False, b""
408

409 10
    def parse_frame(
410
        self, buf: bytes
411
    ) -> List[Tuple[bool, Optional[int], bytearray, Optional[bool]]]:
412
        """Return the next frame from the socket."""
413 10
        frames = []
414 10
        if self._tail:
415 10
            buf, self._tail = self._tail + buf, b""
416

417 10
        start_pos = 0
418 10
        buf_length = len(buf)
419

420 3
        while True:
421
            # read header
422 10
            if self._state == WSParserState.READ_HEADER:
423 10
                if buf_length - start_pos >= 2:
424 10
                    data = buf[start_pos : start_pos + 2]
425 10
                    start_pos += 2
426 10
                    first_byte, second_byte = data
427

428 10
                    fin = (first_byte >> 7) & 1
429 10
                    rsv1 = (first_byte >> 6) & 1
430 10
                    rsv2 = (first_byte >> 5) & 1
431 10
                    rsv3 = (first_byte >> 4) & 1
432 10
                    opcode = first_byte & 0xF
433

434
                    # frame-fin = %x0 ; more frames of this message follow
435
                    #           / %x1 ; final frame of this message
436
                    # frame-rsv1 = %x0 ;
437
                    #    1 bit, MUST be 0 unless negotiated otherwise
438
                    # frame-rsv2 = %x0 ;
439
                    #    1 bit, MUST be 0 unless negotiated otherwise
440
                    # frame-rsv3 = %x0 ;
441
                    #    1 bit, MUST be 0 unless negotiated otherwise
442
                    #
443
                    # Remove rsv1 from this test for deflate development
444 10
                    if rsv2 or rsv3 or (rsv1 and not self._compress):
445 10
                        raise WebSocketError(
446
                            WSCloseCode.PROTOCOL_ERROR,
447
                            "Received frame with non-zero reserved bits",
448
                        )
449

450 10
                    if opcode > 0x7 and fin == 0:
451 10
                        raise WebSocketError(
452
                            WSCloseCode.PROTOCOL_ERROR,
453
                            "Received fragmented control frame",
454
                        )
455

456 10
                    has_mask = (second_byte >> 7) & 1
457 10
                    length = second_byte & 0x7F
458

459
                    # Control frames MUST have a payload
460
                    # length of 125 bytes or less
461 10
                    if opcode > 0x7 and length > 125:
462 10
                        raise WebSocketError(
463
                            WSCloseCode.PROTOCOL_ERROR,
464
                            "Control frame payload cannot be " "larger than 125 bytes",
465
                        )
466

467
                    # Set compress status if last package is FIN
468
                    # OR set compress status if this is first fragment
469
                    # Raise error if not first fragment with rsv1 = 0x1
470 10
                    if self._frame_fin or self._compressed is None:
471 10
                        self._compressed = True if rsv1 else False
472 10
                    elif rsv1:
473 10
                        raise WebSocketError(
474
                            WSCloseCode.PROTOCOL_ERROR,
475
                            "Received frame with non-zero reserved bits",
476
                        )
477

478 10
                    self._frame_fin = bool(fin)
479 10
                    self._frame_opcode = opcode
480 10
                    self._has_mask = bool(has_mask)
481 10
                    self._payload_length_flag = length
482 10
                    self._state = WSParserState.READ_PAYLOAD_LENGTH
483
                else:
484 3
                    break
485

486
            # read payload length
487 10
            if self._state == WSParserState.READ_PAYLOAD_LENGTH:
488 10
                length = self._payload_length_flag
489 10
                if length == 126:
490 10
                    if buf_length - start_pos >= 2:
491 10
                        data = buf[start_pos : start_pos + 2]
492 10
                        start_pos += 2
493 10
                        length = UNPACK_LEN2(data)[0]
494 10
                        self._payload_length = length
495 10
                        self._state = (
496
                            WSParserState.READ_PAYLOAD_MASK
497
                            if self._has_mask
498
                            else WSParserState.READ_PAYLOAD
499
                        )
500
                    else:
501 3
                        break
502 10
                elif length > 126:
503 10
                    if buf_length - start_pos >= 8:
504 10
                        data = buf[start_pos : start_pos + 8]
505 10
                        start_pos += 8
506 10
                        length = UNPACK_LEN3(data)[0]
507 10
                        self._payload_length = length
508 10
                        self._state = (
509
                            WSParserState.READ_PAYLOAD_MASK
510
                            if self._has_mask
511
                            else WSParserState.READ_PAYLOAD
512
                        )
513
                    else:
514 3
                        break
515
                else:
516 10
                    self._payload_length = length
517 10
                    self._state = (
518
                        WSParserState.READ_PAYLOAD_MASK
519
                        if self._has_mask
520
                        else WSParserState.READ_PAYLOAD
521
                    )
522

523
            # read payload mask
524 10
            if self._state == WSParserState.READ_PAYLOAD_MASK:
525 10
                if buf_length - start_pos >= 4:
526 10
                    self._frame_mask = buf[start_pos : start_pos + 4]
527 10
                    start_pos += 4
528 10
                    self._state = WSParserState.READ_PAYLOAD
529
                else:
530 3
                    break
531

532 10
            if self._state == WSParserState.READ_PAYLOAD:
533 10
                length = self._payload_length
534 10
                payload = self._frame_payload
535

536 10
                chunk_len = buf_length - start_pos
537 10
                if length >= chunk_len:
538 10
                    self._payload_length = length - chunk_len
539 10
                    payload.extend(buf[start_pos:])
540 10
                    start_pos = buf_length
541
                else:
542 10
                    self._payload_length = 0
543 10
                    payload.extend(buf[start_pos : start_pos + length])
544 10
                    start_pos = start_pos + length
545

546 10
                if self._payload_length == 0:
547 10
                    if self._has_mask:
548 10
                        assert self._frame_mask is not None
549 10
                        _websocket_mask(self._frame_mask, payload)
550

551 10
                    frames.append(
552
                        (self._frame_fin, self._frame_opcode, payload, self._compressed)
553
                    )
554

555 10
                    self._frame_payload = bytearray()
556 10
                    self._state = WSParserState.READ_HEADER
557
                else:
558 3
                    break
559

560 10
        self._tail = buf[start_pos:]
561

562 10
        return frames
563

564

565 10
class WebSocketWriter:
566 10
    def __init__(
567
        self,
568
        protocol: BaseProtocol,
569
        transport: asyncio.Transport,
570
        *,
571
        use_mask: bool = False,
572
        limit: int = DEFAULT_LIMIT,
573
        random: Any = random.Random(),
574
        compress: int = 0,
575
        notakeover: bool = False,
576
    ) -> None:
577 10
        self.protocol = protocol
578 10
        self.transport = transport
579 10
        self.use_mask = use_mask
580 10
        self.randrange = random.randrange
581 10
        self.compress = compress
582 10
        self.notakeover = notakeover
583 10
        self._closing = False
584 10
        self._limit = limit
585 10
        self._output_size = 0
586 10
        self._compressobj = None  # type: Any  # actually compressobj
587

588 10
    async def _send_frame(
589
        self, message: bytes, opcode: int, compress: Optional[int] = None
590
    ) -> None:
591
        """Send a frame over the websocket with message as its payload."""
592 10
        if self._closing and not (opcode & WSMsgType.CLOSE):
593 10
            raise ConnectionResetError("Cannot write to closing transport")
594

595 10
        rsv = 0
596

597
        # Only compress larger packets (disabled)
598
        # Does small packet needs to be compressed?
599
        # if self.compress and opcode < 8 and len(message) > 124:
600 10
        if (compress or self.compress) and opcode < 8:
601 10
            if compress:
602
                # Do not set self._compress if compressing is for this frame
603 10
                compressobj = zlib.compressobj(level=zlib.Z_BEST_SPEED, wbits=-compress)
604
            else:  # self.compress
605 10
                if not self._compressobj:
606 10
                    self._compressobj = zlib.compressobj(
607
                        level=zlib.Z_BEST_SPEED, wbits=-self.compress
608
                    )
609 10
                compressobj = self._compressobj
610

611 10
            message = compressobj.compress(message)
612 10
            message = message + compressobj.flush(
613
                zlib.Z_FULL_FLUSH if self.notakeover else zlib.Z_SYNC_FLUSH
614
            )
615 10
            if message.endswith(_WS_DEFLATE_TRAILING):
616 10
                message = message[:-4]
617 10
            rsv = rsv | 0x40
618

619 10
        msg_length = len(message)
620

621 10
        use_mask = self.use_mask
622 10
        if use_mask:
623 10
            mask_bit = 0x80
624
        else:
625 10
            mask_bit = 0
626

627 10
        if msg_length < 126:
628 10
            header = PACK_LEN1(0x80 | rsv | opcode, msg_length | mask_bit)
629 10
        elif msg_length < (1 << 16):
630 10
            header = PACK_LEN2(0x80 | rsv | opcode, 126 | mask_bit, msg_length)
631
        else:
632 10
            header = PACK_LEN3(0x80 | rsv | opcode, 127 | mask_bit, msg_length)
633 10
        if use_mask:
634 10
            mask = self.randrange(0, 0xFFFFFFFF)
635 10
            mask = mask.to_bytes(4, "big")
636 10
            message = bytearray(message)
637 10
            _websocket_mask(mask, message)
638 10
            self._write(header + mask + message)
639 10
            self._output_size += len(header) + len(mask) + len(message)
640
        else:
641 10
            if len(message) > MSG_SIZE:
642 10
                self._write(header)
643 10
                self._write(message)
644
            else:
645 10
                self._write(header + message)
646

647 10
            self._output_size += len(header) + len(message)
648

649 10
        if self._output_size > self._limit:
650 10
            self._output_size = 0
651 10
            await self.protocol._drain_helper()
652

653 10
    def _write(self, data: bytes) -> None:
654 10
        if self.transport is None or self.transport.is_closing():
655 10
            raise ConnectionResetError("Cannot write to closing transport")
656 10
        self.transport.write(data)
657

658 10
    async def pong(self, message: bytes = b"") -> None:
659
        """Send pong message."""
660 10
        if isinstance(message, str):
661 10
            message = message.encode("utf-8")
662 10
        await self._send_frame(message, WSMsgType.PONG)
663

664 10
    async def ping(self, message: bytes = b"") -> None:
665
        """Send ping message."""
666 10
        if isinstance(message, str):
667 10
            message = message.encode("utf-8")
668 10
        await self._send_frame(message, WSMsgType.PING)
669

670 10
    async def send(
671
        self,
672
        message: Union[str, bytes],
673
        binary: bool = False,
674
        compress: Optional[int] = None,
675
    ) -> None:
676
        """Send a frame over the websocket with message as its payload."""
677 10
        if isinstance(message, str):
678 10
            message = message.encode("utf-8")
679 10
        if binary:
680 10
            await self._send_frame(message, WSMsgType.BINARY, compress)
681
        else:
682 10
            await self._send_frame(message, WSMsgType.TEXT, compress)
683

684 10
    async def close(self, code: int = 1000, message: bytes = b"") -> None:
685
        """Close the websocket, sending the specified code and message."""
686 10
        if isinstance(message, str):
687 10
            message = message.encode("utf-8")
688 10
        try:
689 10
            await self._send_frame(
690
                PACK_CLOSE_CODE(code) + message, opcode=WSMsgType.CLOSE
691
            )
692
        finally:
693 10
            self._closing = True

Read our documentation on viewing source code .

Loading