aio-libs / aiohttp
1 10
import asyncio
2 10
import codecs
3 10
import dataclasses
4 10
import functools
5 10
import io
6 10
import re
7 10
import sys
8 10
import traceback
9 10
import warnings
10 10
from hashlib import md5, sha1, sha256
11 10
from http.cookies import CookieError, Morsel, SimpleCookie
12 10
from types import MappingProxyType, TracebackType
13 10
from typing import (
14
    TYPE_CHECKING,
15
    Any,
16
    Dict,
17
    Iterable,
18
    List,
19
    Mapping,
20
    Optional,
21
    Tuple,
22
    Type,
23
    Union,
24
    cast,
25
)
26

27 10
from multidict import CIMultiDict, CIMultiDictProxy, MultiDict, MultiDictProxy
28 10
from yarl import URL
29

30 10
from . import hdrs, helpers, http, multipart, payload
31 10
from .abc import AbstractStreamWriter
32 10
from .client_exceptions import (
33
    ClientConnectionError,
34
    ClientOSError,
35
    ClientResponseError,
36
    ContentTypeError,
37
    InvalidURL,
38
    ServerFingerprintMismatch,
39
)
40 10
from .formdata import FormData
41 10
from .helpers import (
42
    BaseTimerContext,
43
    BasicAuth,
44
    HeadersMixin,
45
    TimerNoop,
46
    is_expected_content_type,
47
    noop,
48
    reify,
49
    set_result,
50
)
51 10
from .http import SERVER_SOFTWARE, HttpVersion10, HttpVersion11, StreamWriter
52 10
from .http_parser import HAS_BROTLI
53 10
from .log import client_logger
54 10
from .streams import StreamReader
55 10
from .typedefs import (
56
    DEFAULT_JSON_DECODER,
57
    JSONDecoder,
58
    LooseCookies,
59
    LooseHeaders,
60
    RawHeaders,
61
)
62

63 10
try:
64 10
    import ssl
65 10
    from ssl import SSLContext
66
except ImportError:  # pragma: no cover
67
    ssl = None  # type: ignore[assignment]
68
    SSLContext = object  # type: ignore[misc,assignment]
69

70 10
try:
71 10
    import cchardet as chardet
72
except ImportError:  # pragma: no cover
73
    import chardet  # type: ignore[no-redef]
74

75

76 10
__all__ = ("ClientRequest", "ClientResponse", "RequestInfo", "Fingerprint")
77

78

79
if TYPE_CHECKING:  # pragma: no cover
80
    from .client import ClientSession
81
    from .connector import Connection
82
    from .tracing import Trace
83

84

85 10
def _gen_default_accept_encoding() -> str:
86 10
    return "gzip, deflate, br" if HAS_BROTLI else "gzip, deflate"
87

88

89 10
@dataclasses.dataclass(frozen=True)
90 7
class ContentDisposition:
91 10
    type: Optional[str]
92 10
    parameters: "MappingProxyType[str, str]"
93 10
    filename: Optional[str]
94

95

96 10
@dataclasses.dataclass(frozen=True)
97 7
class RequestInfo:
98 10
    url: URL
99 10
    method: str
100 10
    headers: "CIMultiDictProxy[str]"
101 10
    real_url: URL
102

103

104 10
class Fingerprint:
105 10
    HASHFUNC_BY_DIGESTLEN = {
106
        16: md5,
107
        20: sha1,
108
        32: sha256,
109
    }
110

111 10
    def __init__(self, fingerprint: bytes) -> None:
112 10
        digestlen = len(fingerprint)
113 10
        hashfunc = self.HASHFUNC_BY_DIGESTLEN.get(digestlen)
114 10
        if not hashfunc:
115 10
            raise ValueError("fingerprint has invalid length")
116 10
        elif hashfunc is md5 or hashfunc is sha1:
117 10
            raise ValueError(
118
                "md5 and sha1 are insecure and " "not supported. Use sha256."
119
            )
120 10
        self._hashfunc = hashfunc
121 10
        self._fingerprint = fingerprint
122

123 10
    @property
124 10
    def fingerprint(self) -> bytes:
125 10
        return self._fingerprint
126

127 10
    def check(self, transport: asyncio.Transport) -> None:
128 10
        if not transport.get_extra_info("sslcontext"):
129 10
            return
130 10
        sslobj = transport.get_extra_info("ssl_object")
131 10
        cert = sslobj.getpeercert(binary_form=True)
132 10
        got = self._hashfunc(cert).digest()
133 10
        if got != self._fingerprint:
134 10
            host, port, *_ = transport.get_extra_info("peername")
135 10
            raise ServerFingerprintMismatch(self._fingerprint, got, host, port)
136

137

138 10
if ssl is not None:
139 10
    SSL_ALLOWED_TYPES = (ssl.SSLContext, bool, Fingerprint, type(None))
140
else:  # pragma: no cover
141
    SSL_ALLOWED_TYPES = type(None)
142

143

144 10
@dataclasses.dataclass(frozen=True)
145 7
class ConnectionKey:
146
    # the key should contain an information about used proxy / TLS
147
    # to prevent reusing wrong connections from a pool
148 10
    host: str
149 10
    port: Optional[int]
150 10
    is_ssl: bool
151 10
    ssl: Union[SSLContext, None, bool, Fingerprint]
152 10
    proxy: Optional[URL]
153 10
    proxy_auth: Optional[BasicAuth]
154 10
    proxy_headers_hash: Optional[int]  # hash(CIMultiDict)
155

156

157 10
class ClientRequest:
158 10
    GET_METHODS = {
159
        hdrs.METH_GET,
160
        hdrs.METH_HEAD,
161
        hdrs.METH_OPTIONS,
162
        hdrs.METH_TRACE,
163
    }
164 10
    POST_METHODS = {hdrs.METH_PATCH, hdrs.METH_POST, hdrs.METH_PUT}
165 10
    ALL_METHODS = GET_METHODS.union(POST_METHODS).union({hdrs.METH_DELETE})
166

167 10
    DEFAULT_HEADERS = {
168
        hdrs.ACCEPT: "*/*",
169
        hdrs.ACCEPT_ENCODING: _gen_default_accept_encoding(),
170
    }
171

172 10
    body = b""
173 10
    auth = None
174 10
    response = None
175

176 10
    _writer = None  # async task for streaming data
177 10
    _continue = None  # waiter future for '100 Continue' response
178

179
    # N.B.
180
    # Adding __del__ method with self._writer closing doesn't make sense
181
    # because _writer is instance method, thus it keeps a reference to self.
182
    # Until writer has finished finalizer will not be called.
183

184 10
    def __init__(
185
        self,
186
        method: str,
187
        url: URL,
188
        *,
189
        params: Optional[Mapping[str, str]] = None,
190
        headers: Optional[LooseHeaders] = None,
191
        skip_auto_headers: Iterable[str] = frozenset(),
192
        data: Any = None,
193
        cookies: Optional[LooseCookies] = None,
194
        auth: Optional[BasicAuth] = None,
195
        version: http.HttpVersion = http.HttpVersion11,
196
        compress: Optional[str] = None,
197
        chunked: Optional[bool] = None,
198
        expect100: bool = False,
199
        loop: asyncio.AbstractEventLoop,
200
        response_class: Optional[Type["ClientResponse"]] = None,
201
        proxy: Optional[URL] = None,
202
        proxy_auth: Optional[BasicAuth] = None,
203
        timer: Optional[BaseTimerContext] = None,
204
        session: Optional["ClientSession"] = None,
205
        ssl: Union[SSLContext, bool, Fingerprint, None] = None,
206
        proxy_headers: Optional[LooseHeaders] = None,
207
        traces: Optional[List["Trace"]] = None,
208
    ):
209

210 10
        assert isinstance(url, URL), url
211 10
        assert isinstance(proxy, (URL, type(None))), proxy
212
        # FIXME: session is None in tests only, need to fix tests
213
        # assert session is not None
214 10
        self._session = cast("ClientSession", session)
215 10
        if params:
216 10
            q = MultiDict(url.query)
217 10
            url2 = url.with_query(params)
218 10
            q.extend(url2.query)
219 10
            url = url.with_query(q)
220 10
        self.original_url = url
221 10
        self.url = url.with_fragment(None)
222 10
        self.method = method.upper()
223 10
        self.chunked = chunked
224 10
        self.compress = compress
225 10
        self.loop = loop
226 10
        self.length = None
227 10
        if response_class is None:
228 10
            real_response_class = ClientResponse
229
        else:
230 10
            real_response_class = response_class
231 10
        self.response_class = real_response_class  # type: Type[ClientResponse]
232 10
        self._timer = timer if timer is not None else TimerNoop()
233 10
        self._ssl = ssl
234

235 10
        if loop.get_debug():
236 10
            self._source_traceback = traceback.extract_stack(sys._getframe(1))
237

238 10
        self.update_version(version)
239 10
        self.update_host(url)
240 10
        self.update_headers(headers)
241 10
        self.update_auto_headers(skip_auto_headers)
242 10
        self.update_cookies(cookies)
243 10
        self.update_content_encoding(data)
244 10
        self.update_auth(auth)
245 10
        self.update_proxy(proxy, proxy_auth, proxy_headers)
246

247 10
        self.update_body_from_data(data)
248 10
        if data or self.method not in self.GET_METHODS:
249 10
            self.update_transfer_encoding()
250 10
        self.update_expect_continue(expect100)
251 10
        if traces is None:
252 10
            traces = []
253 10
        self._traces = traces
254

255 10
    def is_ssl(self) -> bool:
256 10
        return self.url.scheme in ("https", "wss")
257

258 10
    @property
259 10
    def ssl(self) -> Union["SSLContext", None, bool, Fingerprint]:
260 10
        return self._ssl
261

262 10
    @property
263 10
    def connection_key(self) -> ConnectionKey:
264 10
        proxy_headers = self.proxy_headers
265 10
        if proxy_headers:
266 10
            h = hash(
267
                tuple((k, v) for k, v in proxy_headers.items())
268
            )  # type: Optional[int]
269
        else:
270 10
            h = None
271 10
        return ConnectionKey(
272
            self.host,
273
            self.port,
274
            self.is_ssl(),
275
            self.ssl,
276
            self.proxy,
277
            self.proxy_auth,
278
            h,
279
        )
280

281 10
    @property
282 10
    def host(self) -> str:
283 10
        ret = self.url.raw_host
284 10
        assert ret is not None
285 10
        return ret
286

287 10
    @property
288 10
    def port(self) -> Optional[int]:
289 10
        return self.url.port
290

291 10
    @property
292 10
    def request_info(self) -> RequestInfo:
293 10
        headers = CIMultiDictProxy(self.headers)  # type: CIMultiDictProxy[str]
294 10
        return RequestInfo(self.url, self.method, headers, self.original_url)
295

296 10
    def update_host(self, url: URL) -> None:
297
        """Update destination host, port and connection type (ssl)."""
298
        # get host/port
299 10
        if not url.raw_host:
300 10
            raise InvalidURL(url)
301

302
        # basic auth info
303 10
        username, password = url.user, url.password
304 10
        if username:
305 10
            self.auth = helpers.BasicAuth(username, password or "")
306

307 10
    def update_version(self, version: Union[http.HttpVersion, str]) -> None:
308
        """Convert request version to two elements tuple.
309

310
        parser HTTP version '1.1' => (1, 1)
311
        """
312 10
        if isinstance(version, str):
313 10
            v = [part.strip() for part in version.split(".", 1)]
314 10
            try:
315 10
                version = http.HttpVersion(int(v[0]), int(v[1]))
316 10
            except ValueError:
317 10
                raise ValueError(
318
                    f"Can not parse http version number: {version}"
319
                ) from None
320 10
        self.version = version
321

322 10
    def update_headers(self, headers: Optional[LooseHeaders]) -> None:
323
        """Update request headers."""
324 10
        self.headers = CIMultiDict()  # type: CIMultiDict[str]
325

326
        # add host
327 10
        netloc = cast(str, self.url.raw_host)
328 10
        if helpers.is_ipv6_address(netloc):
329 10
            netloc = f"[{netloc}]"
330 10
        if self.url.port is not None and not self.url.is_default_port():
331 10
            netloc += ":" + str(self.url.port)
332 10
        self.headers[hdrs.HOST] = netloc
333

334 10
        if headers:
335 10
            if isinstance(headers, (dict, MultiDictProxy, MultiDict)):
336 10
                headers = headers.items()  # type: ignore[assignment]
337

338 10
            for key, value in headers:  # type: ignore[misc]
339
                # A special case for Host header
340 10
                if key.lower() == "host":
341 10
                    self.headers[key] = value
342
                else:
343 10
                    self.headers.add(key, value)
344

345 10
    def update_auto_headers(self, skip_auto_headers: Iterable[str]) -> None:
346 10
        self.skip_auto_headers = CIMultiDict(
347
            (hdr, None) for hdr in sorted(skip_auto_headers)
348
        )
349 10
        used_headers = self.headers.copy()
350 10
        used_headers.extend(self.skip_auto_headers)  # type: ignore[arg-type]
351

352 10
        for hdr, val in self.DEFAULT_HEADERS.items():
353 10
            if hdr not in used_headers:
354 10
                self.headers.add(hdr, val)
355

356 10
        if hdrs.USER_AGENT not in used_headers:
357 10
            self.headers[hdrs.USER_AGENT] = SERVER_SOFTWARE
358

359 10
    def update_cookies(self, cookies: Optional[LooseCookies]) -> None:
360
        """Update request cookies header."""
361 10
        if not cookies:
362 10
            return
363

364 10
        c = SimpleCookie()  # type: SimpleCookie[str]
365 10
        if hdrs.COOKIE in self.headers:
366 10
            c.load(self.headers.get(hdrs.COOKIE, ""))
367 10
            del self.headers[hdrs.COOKIE]
368

369 10
        if isinstance(cookies, Mapping):
370 10
            iter_cookies = cookies.items()
371
        else:
372 10
            iter_cookies = cookies  # type: ignore[assignment]
373 10
        for name, value in iter_cookies:
374 10
            if isinstance(value, Morsel):
375
                # Preserve coded_value
376 10
                mrsl_val = value.get(value.key, Morsel())
377 10
                mrsl_val.set(value.key, value.value, value.coded_value)
378 10
                c[name] = mrsl_val
379
            else:
380 10
                c[name] = value  # type: ignore[assignment]
381

382 10
        self.headers[hdrs.COOKIE] = c.output(header="", sep=";").strip()
383

384 10
    def update_content_encoding(self, data: Any) -> None:
385
        """Set request content encoding."""
386 10
        if not data:
387 10
            return
388

389 10
        enc = self.headers.get(hdrs.CONTENT_ENCODING, "").lower()
390 10
        if enc:
391 10
            if self.compress:
392 10
                raise ValueError(
393
                    "compress can not be set " "if Content-Encoding header is set"
394
                )
395 10
        elif self.compress:
396 10
            if not isinstance(self.compress, str):
397 10
                self.compress = "deflate"
398 10
            self.headers[hdrs.CONTENT_ENCODING] = self.compress
399 10
            self.chunked = True  # enable chunked, no need to deal with length
400

401 10
    def update_transfer_encoding(self) -> None:
402
        """Analyze transfer-encoding header."""
403 10
        te = self.headers.get(hdrs.TRANSFER_ENCODING, "").lower()
404

405 10
        if "chunked" in te:
406 10
            if self.chunked:
407 10
                raise ValueError(
408
                    "chunked can not be set "
409
                    'if "Transfer-Encoding: chunked" header is set'
410
                )
411

412 10
        elif self.chunked:
413 10
            if hdrs.CONTENT_LENGTH in self.headers:
414 10
                raise ValueError(
415
                    "chunked can not be set " "if Content-Length header is set"
416
                )
417

418 10
            self.headers[hdrs.TRANSFER_ENCODING] = "chunked"
419
        else:
420 10
            if hdrs.CONTENT_LENGTH not in self.headers:
421 10
                self.headers[hdrs.CONTENT_LENGTH] = str(len(self.body))
422

423 10
    def update_auth(self, auth: Optional[BasicAuth]) -> None:
424
        """Set basic auth."""
425 10
        if auth is None:
426 10
            auth = self.auth
427 10
        if auth is None:
428 10
            return
429

430 10
        if not isinstance(auth, helpers.BasicAuth):
431 10
            raise TypeError("BasicAuth() tuple is required instead")
432

433 10
        self.headers[hdrs.AUTHORIZATION] = auth.encode()
434

435 10
    def update_body_from_data(self, body: Any) -> None:
436 10
        if not body:
437 10
            return
438

439
        # FormData
440 10
        if isinstance(body, FormData):
441 10
            body = body()
442

443 10
        try:
444 10
            body = payload.PAYLOAD_REGISTRY.get(body, disposition=None)
445 10
        except payload.LookupError:
446 10
            body = FormData(body)()
447

448 10
        self.body = body
449

450
        # enable chunked encoding if needed
451 10
        if not self.chunked:
452 10
            if hdrs.CONTENT_LENGTH not in self.headers:
453 10
                size = body.size
454 10
                if size is None:
455 10
                    self.chunked = True
456
                else:
457 10
                    if hdrs.CONTENT_LENGTH not in self.headers:
458 10
                        self.headers[hdrs.CONTENT_LENGTH] = str(size)
459

460
        # copy payload headers
461 10
        assert body.headers
462 10
        for (key, value) in body.headers.items():
463 10
            if key in self.headers:
464 10
                continue
465 10
            if key in self.skip_auto_headers:
466 10
                continue
467 10
            self.headers[key] = value
468

469 10
    def update_expect_continue(self, expect: bool = False) -> None:
470 10
        if expect:
471 10
            self.headers[hdrs.EXPECT] = "100-continue"
472 10
        elif self.headers.get(hdrs.EXPECT, "").lower() == "100-continue":
473 10
            expect = True
474

475 10
        if expect:
476 10
            self._continue = self.loop.create_future()
477

478 10
    def update_proxy(
479
        self,
480
        proxy: Optional[URL],
481
        proxy_auth: Optional[BasicAuth],
482
        proxy_headers: Optional[LooseHeaders],
483
    ) -> None:
484 10
        if proxy and not proxy.scheme == "http":
485 10
            raise ValueError("Only http proxies are supported")
486 10
        if proxy_auth and not isinstance(proxy_auth, helpers.BasicAuth):
487 10
            raise ValueError("proxy_auth must be None or BasicAuth() tuple")
488 10
        self.proxy = proxy
489 10
        self.proxy_auth = proxy_auth
490 10
        self.proxy_headers = proxy_headers
491

492 10
    def keep_alive(self) -> bool:
493 10
        if self.version < HttpVersion10:
494
            # keep alive not supported at all
495 10
            return False
496 10
        if self.version == HttpVersion10:
497 10
            if self.headers.get(hdrs.CONNECTION) == "keep-alive":
498 10
                return True
499
            else:  # no headers means we close for Http 1.0
500 10
                return False
501 10
        elif self.headers.get(hdrs.CONNECTION) == "close":
502 10
            return False
503

504 10
        return True
505

506 10
    async def write_bytes(
507
        self, writer: AbstractStreamWriter, conn: "Connection"
508
    ) -> None:
509
        """Support coroutines that yields bytes objects."""
510
        # 100 response
511 10
        if self._continue is not None:
512 10
            await writer.drain()
513 10
            await self._continue
514

515 10
        protocol = conn.protocol
516 10
        assert protocol is not None
517 10
        try:
518 10
            if isinstance(self.body, payload.Payload):
519 10
                await self.body.write(writer)
520
            else:
521 10
                if isinstance(self.body, (bytes, bytearray)):
522 10
                    self.body = (self.body,)  # type: ignore[assignment]
523

524 10
                for chunk in self.body:
525 10
                    await writer.write(chunk)  # type: ignore[arg-type]
526

527 10
            await writer.write_eof()
528 10
        except OSError as exc:
529 10
            new_exc = ClientOSError(
530
                exc.errno, "Can not write request body for %s" % self.url
531
            )
532 10
            new_exc.__context__ = exc
533 10
            new_exc.__cause__ = exc
534 10
            protocol.set_exception(new_exc)
535 10
        except asyncio.CancelledError as exc:
536 10
            if not conn.closed:
537 0
                protocol.set_exception(exc)
538 10
        except Exception as exc:
539 10
            protocol.set_exception(exc)
540
        finally:
541 10
            self._writer = None
542

543 10
    async def send(self, conn: "Connection") -> "ClientResponse":
544
        # Specify request target:
545
        # - CONNECT request must send authority form URI
546
        # - not CONNECT proxy must send absolute form URI
547
        # - most common is origin form URI
548 10
        if self.method == hdrs.METH_CONNECT:
549 10
            connect_host = self.url.raw_host
550 10
            assert connect_host is not None
551 10
            if helpers.is_ipv6_address(connect_host):
552 0
                connect_host = f"[{connect_host}]"
553 10
            path = f"{connect_host}:{self.url.port}"
554 10
        elif self.proxy and not self.is_ssl():
555 10
            path = str(self.url)
556
        else:
557 10
            path = self.url.raw_path
558 10
            if self.url.raw_query_string:
559 10
                path += "?" + self.url.raw_query_string
560

561 10
        protocol = conn.protocol
562 10
        assert protocol is not None
563 10
        writer = StreamWriter(
564
            protocol,
565
            self.loop,
566
            on_chunk_sent=functools.partial(
567
                self._on_chunk_request_sent, self.method, self.url
568
            ),
569
            on_headers_sent=functools.partial(
570
                self._on_headers_request_sent, self.method, self.url
571
            ),
572
        )
573

574 10
        if self.compress:
575 10
            writer.enable_compression(self.compress)
576

577 10
        if self.chunked is not None:
578 10
            writer.enable_chunking()
579

580
        # set default content-type
581 10
        if (
582
            self.method in self.POST_METHODS
583
            and hdrs.CONTENT_TYPE not in self.skip_auto_headers
584
            and hdrs.CONTENT_TYPE not in self.headers
585
        ):
586 10
            self.headers[hdrs.CONTENT_TYPE] = "application/octet-stream"
587

588
        # set the connection header
589 10
        connection = self.headers.get(hdrs.CONNECTION)
590 10
        if not connection:
591 10
            if self.keep_alive():
592 10
                if self.version == HttpVersion10:
593 10
                    connection = "keep-alive"
594
            else:
595 10
                if self.version == HttpVersion11:
596 10
                    connection = "close"
597

598 10
        if connection is not None:
599 10
            self.headers[hdrs.CONNECTION] = connection
600

601
        # status + headers
602 10
        status_line = "{0} {1} HTTP/{2[0]}.{2[1]}".format(
603
            self.method, path, self.version
604
        )
605 10
        await writer.write_headers(status_line, self.headers)
606

607 10
        self._writer = self.loop.create_task(self.write_bytes(writer, conn))
608

609 10
        response_class = self.response_class
610 10
        assert response_class is not None
611 10
        self.response = response_class(
612
            self.method,
613
            self.original_url,
614
            writer=self._writer,
615
            continue100=self._continue,
616
            timer=self._timer,
617
            request_info=self.request_info,
618
            traces=self._traces,
619
            loop=self.loop,
620
            session=self._session,
621
        )
622 10
        return self.response
623

624 10
    async def close(self) -> None:
625 10
        if self._writer is not None:
626 10
            try:
627 10
                await self._writer
628
            finally:
629 10
                self._writer = None
630

631 10
    def terminate(self) -> None:
632 10
        if self._writer is not None:
633 10
            if not self.loop.is_closed():
634 10
                self._writer.cancel()
635 10
            self._writer = None
636

637 10
    async def _on_chunk_request_sent(self, method: str, url: URL, chunk: bytes) -> None:
638 10
        for trace in self._traces:
639 10
            await trace.send_request_chunk_sent(method, url, chunk)
640

641 10
    async def _on_headers_request_sent(
642
        self, method: str, url: URL, headers: "CIMultiDict[str]"
643
    ) -> None:
644 10
        for trace in self._traces:
645 10
            await trace.send_request_headers(method, url, headers)
646

647

648 10
class ClientResponse(HeadersMixin):
649

650
    # from the Status-Line of the response
651 10
    version = None  # HTTP-Version
652 10
    status = None  # type: int  # Status-Code
653 10
    reason = None  # Reason-Phrase
654

655 10
    content = None  # type: StreamReader  # Payload stream
656 10
    _headers = None  # type: CIMultiDictProxy[str]  # Response headers
657 10
    _raw_headers = None  # type: RawHeaders  # Response raw headers
658

659 10
    _connection = None  # current connection
660 10
    _source_traceback = None
661
    # setted up by ClientRequest after ClientResponse object creation
662
    # post-init stage allows to not change ctor signature
663 10
    _closed = True  # to allow __del__ for non-initialized properly response
664 10
    _released = False
665

666 10
    def __init__(
667
        self,
668
        method: str,
669
        url: URL,
670
        *,
671
        writer: "asyncio.Task[None]",
672
        continue100: Optional["asyncio.Future[bool]"],
673
        timer: BaseTimerContext,
674
        request_info: RequestInfo,
675
        traces: List["Trace"],
676
        loop: asyncio.AbstractEventLoop,
677
        session: "ClientSession",
678
    ) -> None:
679 10
        assert isinstance(url, URL)
680 10
        super().__init__()
681

682 10
        self.method = method
683 10
        self.cookies = SimpleCookie()  # type: SimpleCookie[str]
684

685 10
        self._real_url = url
686 10
        self._url = url.with_fragment(None)
687 10
        self._body = None  # type: Optional[bytes]
688 10
        self._writer = writer  # type: Optional[asyncio.Task[None]]
689 10
        self._continue = continue100  # None by default
690 10
        self._closed = True
691 10
        self._history = ()  # type: Tuple[ClientResponse, ...]
692 10
        self._request_info = request_info
693 10
        self._timer = timer if timer is not None else TimerNoop()
694 10
        self._cache = {}  # type: Dict[str, Any]
695 10
        self._traces = traces
696 10
        self._loop = loop
697
        # store a reference to session #1985
698 10
        self._session = session  # type: Optional[ClientSession]
699 10
        if loop.get_debug():
700 10
            self._source_traceback = traceback.extract_stack(sys._getframe(1))
701

702 10
    @reify
703 10
    def url(self) -> URL:
704 10
        return self._url
705

706 10
    @reify
707 10
    def real_url(self) -> URL:
708 10
        return self._real_url
709

710 10
    @reify
711 10
    def host(self) -> str:
712 10
        assert self._url.host is not None
713 10
        return self._url.host
714

715 10
    @reify
716 10
    def headers(self) -> "CIMultiDictProxy[str]":
717 10
        return self._headers
718

719 10
    @reify
720 10
    def raw_headers(self) -> RawHeaders:
721 10
        return self._raw_headers
722

723 10
    @reify
724 10
    def request_info(self) -> RequestInfo:
725 10
        return self._request_info
726

727 10
    @reify
728 10
    def content_disposition(self) -> Optional[ContentDisposition]:
729 10
        raw = self._headers.get(hdrs.CONTENT_DISPOSITION)
730 10
        if raw is None:
731 10
            return None
732 10
        disposition_type, params_dct = multipart.parse_content_disposition(raw)
733 10
        params = MappingProxyType(params_dct)
734 10
        filename = multipart.content_disposition_filename(params)
735 10
        return ContentDisposition(disposition_type, params, filename)
736

737 10
    def __del__(self, _warnings: Any = warnings) -> None:
738 10
        if self._closed:
739 10
            return
740

741 10
        if self._connection is not None:
742 10
            self._connection.release()
743 10
            self._cleanup_writer()
744

745 10
            if self._loop.get_debug():
746 10
                _warnings.warn(
747
                    f"Unclosed response {self!r}", ResourceWarning, source=self
748
                )
749 10
                context = {"client_response": self, "message": "Unclosed response"}
750 10
                if self._source_traceback:
751 10
                    context["source_traceback"] = self._source_traceback
752 10
                self._loop.call_exception_handler(context)
753

754 10
    def __repr__(self) -> str:
755 10
        out = io.StringIO()
756 10
        ascii_encodable_url = str(self.url)
757 10
        if self.reason:
758 10
            ascii_encodable_reason = self.reason.encode(
759
                "ascii", "backslashreplace"
760
            ).decode("ascii")
761
        else:
762 10
            ascii_encodable_reason = self.reason
763 10
        print(
764
            "<ClientResponse({}) [{} {}]>".format(
765
                ascii_encodable_url, self.status, ascii_encodable_reason
766
            ),
767
            file=out,
768
        )
769 10
        print(self.headers, file=out)
770 10
        return out.getvalue()
771

772 10
    @property
773 10
    def connection(self) -> Optional["Connection"]:
774 10
        return self._connection
775

776 10
    @reify
777 10
    def history(self) -> Tuple["ClientResponse", ...]:
778
        """A sequence of responses, if redirects occurred."""
779 10
        return self._history
780

781 10
    @reify
782 10
    def links(self) -> "MultiDictProxy[MultiDictProxy[Union[str, URL]]]":
783 10
        links_str = ", ".join(self.headers.getall("link", []))
784

785 10
        if not links_str:
786 10
            return MultiDictProxy(MultiDict())
787

788 10
        links = MultiDict()  # type: MultiDict[MultiDictProxy[Union[str, URL]]]
789

790 10
        for val in re.split(r",(?=\s*<)", links_str):
791 10
            match = re.match(r"\s*<(.*)>(.*)", val)
792
            if match is None:  # pragma: no cover
793
                # the check exists to suppress mypy error
794
                continue
795 10
            url, params_str = match.groups()
796 10
            params = params_str.split(";")[1:]
797

798 10
            link = MultiDict()  # type: MultiDict[Union[str, URL]]
799

800 10
            for param in params:
801 10
                match = re.match(r"^\s*(\S*)\s*=\s*(['\"]?)(.*?)(\2)\s*$", param, re.M)
802
                if match is None:  # pragma: no cover
803
                    # the check exists to suppress mypy error
804
                    continue
805 10
                key, _, value, _ = match.groups()
806

807 10
                link.add(key, value)
808

809 10
            key = link.get("rel", url)  # type: ignore[assignment]
810

811 10
            link.add("url", self.url.join(URL(url)))
812

813 10
            links.add(key, MultiDictProxy(link))
814

815 10
        return MultiDictProxy(links)
816

817 10
    async def start(self, connection: "Connection") -> "ClientResponse":
818
        """Start response processing."""
819 10
        self._closed = False
820 10
        self._protocol = connection.protocol
821 10
        self._connection = connection
822

823 10
        with self._timer:
824 3
            while True:
825
                # read response
826 10
                try:
827 10
                    protocol = self._protocol
828 10
                    message, payload = await protocol.read()  # type: ignore[union-attr]
829 10
                except http.HttpProcessingError as exc:
830 10
                    raise ClientResponseError(
831
                        self.request_info,
832
                        self.history,
833
                        status=exc.code,
834
                        message=exc.message,
835
                        headers=exc.headers,
836
                    ) from exc
837

838 10
                if message.code < 100 or message.code > 199 or message.code == 101:
839 10
                    break
840

841 10
                if self._continue is not None:
842 10
                    set_result(self._continue, True)
843 10
                    self._continue = None
844

845
        # payload eof handler
846 10
        payload.on_eof(self._response_eof)
847

848
        # response status
849 10
        self.version = message.version
850 10
        self.status = message.code
851 10
        self.reason = message.reason
852

853
        # headers
854 10
        self._headers = message.headers  # type is CIMultiDictProxy
855 10
        self._raw_headers = message.raw_headers  # type is Tuple[bytes, bytes]
856

857
        # payload
858 10
        self.content = payload
859

860
        # cookies
861 10
        for hdr in self.headers.getall(hdrs.SET_COOKIE, ()):
862 10
            try:
863 10
                self.cookies.load(hdr)
864 10
            except CookieError as exc:
865 10
                client_logger.warning("Can not load response cookies: %s", exc)
866 10
        return self
867

868 10
    def _response_eof(self) -> None:
869 10
        if self._closed:
870 10
            return
871

872 10
        if self._connection is not None:
873
            # websocket, protocol could be None because
874
            # connection could be detached
875 10
            if (
876
                self._connection.protocol is not None
877
                and self._connection.protocol.upgraded
878
            ):
879 10
                return
880

881 10
            self._connection.release()
882 10
            self._connection = None
883

884 10
        self._closed = True
885 10
        self._cleanup_writer()
886

887 10
    @property
888 10
    def closed(self) -> bool:
889 10
        return self._closed
890

891 10
    def close(self) -> None:
892 10
        if not self._released:
893 10
            self._notify_content()
894 10
        if self._closed:
895 10
            return
896

897 10
        self._closed = True
898 10
        if self._loop is None or self._loop.is_closed():
899 10
            return
900

901 10
        if self._connection is not None:
902 10
            self._connection.close()
903 10
            self._connection = None
904 10
        self._cleanup_writer()
905

906 10
    def release(self) -> Any:
907 10
        if not self._released:
908 10
            self._notify_content()
909 10
        if self._closed:
910 10
            return noop()
911

912 10
        self._closed = True
913 10
        if self._connection is not None:
914 10
            self._connection.release()
915 10
            self._connection = None
916

917 10
        self._cleanup_writer()
918 10
        return noop()
919

920 10
    @property
921 10
    def ok(self) -> bool:
922
        """Returns ``True`` if ``status`` is less than ``400``, ``False`` if not.
923

924
        This is **not** a check for ``200 OK`` but a check that the response
925
        status is under 400.
926
        """
927 10
        return 400 > self.status
928

929 10
    def raise_for_status(self) -> None:
930 10
        if not self.ok:
931
            # reason should always be not None for a started response
932 10
            assert self.reason is not None
933 10
            self.release()
934 10
            raise ClientResponseError(
935
                self.request_info,
936
                self.history,
937
                status=self.status,
938
                message=self.reason,
939
                headers=self.headers,
940
            )
941

942 10
    def _cleanup_writer(self) -> None:
943 10
        if self._writer is not None:
944 10
            self._writer.cancel()
945 10
        self._writer = None
946 10
        self._session = None
947

948 10
    def _notify_content(self) -> None:
949 10
        content = self.content
950 10
        if content and content.exception() is None:
951 10
            content.set_exception(ClientConnectionError("Connection closed"))
952 10
        self._released = True
953

954 10
    async def wait_for_close(self) -> None:
955 10
        if self._writer is not None:
956 10
            try:
957 10
                await self._writer
958
            finally:
959 10
                self._writer = None
960 10
        self.release()
961

962 10
    async def read(self) -> bytes:
963
        """Read response payload."""
964 10
        if self._body is None:
965 10
            try:
966 10
                self._body = await self.content.read()
967 10
                for trace in self._traces:
968 10
                    await trace.send_response_chunk_received(
969
                        self.method, self.url, self._body
970
                    )
971 10
            except BaseException:
972 10
                self.close()
973 10
                raise
974 10
        elif self._released:
975 10
            raise ClientConnectionError("Connection closed")
976

977 10
        return self._body
978

979 10
    def get_encoding(self) -> str:
980 10
        ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower()
981 10
        mimetype = helpers.parse_mimetype(ctype)
982

983 10
        encoding = mimetype.parameters.get("charset")
984 10
        if encoding:
985 10
            try:
986 10
                codecs.lookup(encoding)
987 10
            except LookupError:
988 10
                encoding = None
989 10
        if not encoding:
990 10
            if mimetype.type == "application" and (
991
                mimetype.subtype == "json" or mimetype.subtype == "rdap"
992
            ):
993
                # RFC 7159 states that the default encoding is UTF-8.
994
                # RFC 7483 defines application/rdap+json
995 10
                encoding = "utf-8"
996 10
            elif self._body is None:
997 10
                raise RuntimeError(
998
                    "Cannot guess the encoding of " "a not yet read body"
999
                )
1000
            else:
1001 10
                encoding = chardet.detect(self._body)["encoding"]
1002 10
        if not encoding:
1003 10
            encoding = "utf-8"
1004

1005 10
        return encoding
1006

1007 10
    async def text(self, encoding: Optional[str] = None, errors: str = "strict") -> str:
1008
        """Read response payload and decode."""
1009 10
        if self._body is None:
1010 10
            await self.read()
1011

1012 10
        if encoding is None:
1013 10
            encoding = self.get_encoding()
1014

1015 10
        return self._body.decode(encoding, errors=errors)  # type: ignore[union-attr]
1016

1017 10
    async def json(
1018
        self,
1019
        *,
1020
        encoding: Optional[str] = None,
1021
        loads: JSONDecoder = DEFAULT_JSON_DECODER,
1022
        content_type: Optional[str] = "application/json",
1023
    ) -> Any:
1024
        """Read and decodes JSON response."""
1025 10
        if self._body is None:
1026 10
            await self.read()
1027

1028 10
        if content_type:
1029 10
            ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower()
1030 10
            if not is_expected_content_type(ctype, content_type):
1031 10
                raise ContentTypeError(
1032
                    self.request_info,
1033
                    self.history,
1034
                    message=(
1035
                        "Attempt to decode JSON with " "unexpected mimetype: %s" % ctype
1036
                    ),
1037
                    headers=self.headers,
1038
                )
1039

1040 10
        if encoding is None:
1041 10
            encoding = self.get_encoding()
1042

1043 10
        return loads(self._body.decode(encoding))  # type: ignore[union-attr]
1044

1045 10
    async def __aenter__(self) -> "ClientResponse":
1046 10
        return self
1047

1048 10
    async def __aexit__(
1049
        self,
1050
        exc_type: Optional[Type[BaseException]],
1051
        exc_val: Optional[BaseException],
1052
        exc_tb: Optional[TracebackType],
1053
    ) -> None:
1054
        # similar to _RequestContextManager, we do not need to check
1055
        # for exceptions, response object can close connection
1056
        # if state is broken
1057 10
        self.release()

Read our documentation on viewing source code .

Loading